diff options
author | sunil unnava <sunil.unnava@att.com> | 2018-12-05 13:00:13 -0500 |
---|---|---|
committer | sunil unnava <sunil.unnava@att.com> | 2018-12-05 13:03:06 -0500 |
commit | 2b80d1a99615392a791fa15f04085601f13fbaba (patch) | |
tree | 1b1778a65ca320377ed8cbf5f8deb534c4e919f8 | |
parent | 25010bd2b53ba0e888bdfa36c10acda3841fa5f5 (diff) |
Fix for Kafka Consumer is not safe error
Pre create a dummy topic and subscribe to it during first POST call for
posting a message to create __consumer_offsets topic
Issue-ID: DMAAP-896
Change-Id: I11f3f9b8764232bc7d4e9bb270d5d73fc280cb80
Signed-off-by: sunil unnava <sunil.unnava@att.com>
-rw-r--r-- | src/main/java/org/onap/dmaap/service/EventsRestService.java | 26 |
1 files changed, 25 insertions, 1 deletions
diff --git a/src/main/java/org/onap/dmaap/service/EventsRestService.java b/src/main/java/org/onap/dmaap/service/EventsRestService.java index 2672261..b4aee10 100644 --- a/src/main/java/org/onap/dmaap/service/EventsRestService.java +++ b/src/main/java/org/onap/dmaap/service/EventsRestService.java @@ -96,6 +96,8 @@ public class EventsRestService { @Autowired private DMaaPErrorMessages errorMessages; + + private boolean isOffsetTopicCreated=false; /** * This method is used to consume messages.Taking three parameter @@ -254,7 +256,10 @@ public class EventsRestService { public void pushEvents(@PathParam("topic") String topic, InputStream msg, @QueryParam("partitionKey") String partitionKey) throws CambriaApiException { log.info("Publishing message to topic " + topic); - + + if(!isOffsetTopicCreated){ + preCreateOffsetTopic(msg); + } try { eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null); } @@ -313,8 +318,14 @@ public class EventsRestService { @QueryParam("partitionKey") String partitionKey) throws CambriaApiException { // log.info("Publishing message with transaction id for topic " + topic // ); + try { + + if(!isOffsetTopicCreated){ + preCreateOffsetTopic(request.getInputStream()); + } + eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), partitionKey, Utils.getFormattedDate(new Date())); @@ -374,5 +385,18 @@ public class EventsRestService { return dmaapContext; } + + private void preCreateOffsetTopic(InputStream msg) { + + try { + eventsService.pushEvents(getDmaapContext(), "DUMMY_TOPIC", msg, null, null); + eventsService.getEvents(getDmaapContext(), "DUMMY_TOPIC", "CG1", "C1"); + isOffsetTopicCreated = true; + } catch (CambriaApiException | ConfigDbException | AccessDeniedException | TopicExistsException | IOException + | missingReqdSetting | UnavailableException e) { + log.error("Error while creating the dummy topic", e); + } + + } }
\ No newline at end of file |