summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
authorsunil unnava <sunil.unnava@att.com>2018-12-05 13:00:13 -0500
committersunil unnava <sunil.unnava@att.com>2018-12-05 13:03:06 -0500
commit2b80d1a99615392a791fa15f04085601f13fbaba (patch)
tree1b1778a65ca320377ed8cbf5f8deb534c4e919f8 /src/main/java
parent25010bd2b53ba0e888bdfa36c10acda3841fa5f5 (diff)
Fix for Kafka Consumer is not safe error
Pre create a dummy topic and subscribe to it during first POST call for posting a message to create __consumer_offsets topic Issue-ID: DMAAP-896 Change-Id: I11f3f9b8764232bc7d4e9bb270d5d73fc280cb80 Signed-off-by: sunil unnava <sunil.unnava@att.com>
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/org/onap/dmaap/service/EventsRestService.java26
1 files changed, 25 insertions, 1 deletions
diff --git a/src/main/java/org/onap/dmaap/service/EventsRestService.java b/src/main/java/org/onap/dmaap/service/EventsRestService.java
index 2672261..b4aee10 100644
--- a/src/main/java/org/onap/dmaap/service/EventsRestService.java
+++ b/src/main/java/org/onap/dmaap/service/EventsRestService.java
@@ -96,6 +96,8 @@ public class EventsRestService {
@Autowired
private DMaaPErrorMessages errorMessages;
+
+ private boolean isOffsetTopicCreated=false;
/**
* This method is used to consume messages.Taking three parameter
@@ -254,7 +256,10 @@ public class EventsRestService {
public void pushEvents(@PathParam("topic") String topic, InputStream msg,
@QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
log.info("Publishing message to topic " + topic);
-
+
+ if(!isOffsetTopicCreated){
+ preCreateOffsetTopic(msg);
+ }
try {
eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
}
@@ -313,8 +318,14 @@ public class EventsRestService {
@QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
// log.info("Publishing message with transaction id for topic " + topic
// );
+
try {
+
+ if(!isOffsetTopicCreated){
+ preCreateOffsetTopic(request.getInputStream());
+ }
+
eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
partitionKey,
Utils.getFormattedDate(new Date()));
@@ -374,5 +385,18 @@ public class EventsRestService {
return dmaapContext;
}
+
+ private void preCreateOffsetTopic(InputStream msg) {
+
+ try {
+ eventsService.pushEvents(getDmaapContext(), "DUMMY_TOPIC", msg, null, null);
+ eventsService.getEvents(getDmaapContext(), "DUMMY_TOPIC", "CG1", "C1");
+ isOffsetTopicCreated = true;
+ } catch (CambriaApiException | ConfigDbException | AccessDeniedException | TopicExistsException | IOException
+ | missingReqdSetting | UnavailableException e) {
+ log.error("Error while creating the dummy topic", e);
+ }
+
+ }
} \ No newline at end of file