1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
package org.onap.policy.std;
import java.util.List;
import org.json.JSONObject;
import org.onap.policy.api.NotificationScheme;
import org.onap.policy.api.NotificationType;
import org.onap.policy.api.PDPNotification;
import org.onap.policy.common.logging.flexlogger.FlexLogger;
import org.onap.policy.common.logging.flexlogger.Logger;
import org.onap.policy.utils.BusConsumer;
import org.onap.policy.utils.BusPublisher;
import org.onap.policy.xacml.api.XACMLErrorConstants;
public class ManualClientEndDMAAP {
private static StdPDPNotification notification = null;
private static String resultJson = null;
private static Logger logger = FlexLogger.getLogger(ManualClientEndDMAAP.class.getName());
private static BusConsumer dmaapConsumer = null;
private static String uniquID = null;
private static String topic = null;
public static PDPNotification result(NotificationScheme scheme) {
if (resultJson == null || notification == null) {
logger.debug("No Result" );
return null;
} else {
if(scheme.equals(NotificationScheme.MANUAL_ALL_NOTIFICATIONS)) {
boolean removed = false, updated = false;
if(notification.getRemovedPolicies()!=null && !notification.getRemovedPolicies().isEmpty()){
removed = true;
}
if(notification.getLoadedPolicies()!=null && !notification.getLoadedPolicies().isEmpty()){
updated = true;
}
if(removed && updated) {
notification.setNotificationType(NotificationType.BOTH);
}else if(removed){
notification.setNotificationType(NotificationType.REMOVE);
}else if(updated){
notification.setNotificationType(NotificationType.UPDATE);
}
return notification;
}else if(scheme.equals(NotificationScheme.MANUAL_NOTIFICATIONS)) {
return MatchStore.checkMatch(notification);
}else {
return null;
}
}
}
private static void publishMessage(String pubTopic, String uniqueID, List<String> dmaapList, String aafLogin, String aafPassword) {
BusPublisher pub = null;
try {
pub = new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword);
final JSONObject msg1 = new JSONObject ();
msg1.put ( "JSON", "DMaaP Update Request UID=" + uniqueID);
pub.send ( "MyPartitionKey", msg1.toString () );
} catch (Exception e) {
logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Publisher: ", e);
}
if(pub != null){
pub.close ();
}
}
//NOTE: should be able to remove this for DMAAP since we will not be creating topics dynamically
public static void createTopic (String topic, String uniquID, List<String> dmaapList, String aafLogin, String aafPassword){
ManualClientEndDMAAP.topic = topic;
publishMessage(topic, uniquID, dmaapList, aafLogin, aafPassword);
}
public static void start(List<String> dmaapList, String topic, String aafLogin, String aafPassword, String uniqueID) {
ManualClientEndDMAAP.uniquID = uniqueID;
ManualClientEndDMAAP.topic = topic;
String id = "0";
try {
dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, topic, aafLogin, aafPassword, "clientGroup", id, 15*1000, 1000);
} catch (Exception e) {
logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e);
}
int count = 1;
while (count < 4) {
publishMessage(topic, uniquID, dmaapList, aafLogin, aafPassword);
try {
for ( String msg : dmaapConsumer.fetch () )
{
logger.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : " + dmaapList.toString());
resultJson = msg;
if (!msg.contains("DMaaP Update")){
notification = NotificationUnMarshal.notificationJSON(msg);
count = 4;
}
}
}catch (Exception e) {
logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to fetch messages from DMaaP servers: ", e);
}
count++;
}
}
}
|