diff options
-rw-r--r-- | src/main/java/org/onap/dmaap/service/EventsRestService.java | 26 |
1 files changed, 25 insertions, 1 deletions
diff --git a/src/main/java/org/onap/dmaap/service/EventsRestService.java b/src/main/java/org/onap/dmaap/service/EventsRestService.java index 2672261..b4aee10 100644 --- a/src/main/java/org/onap/dmaap/service/EventsRestService.java +++ b/src/main/java/org/onap/dmaap/service/EventsRestService.java @@ -96,6 +96,8 @@ public class EventsRestService { @Autowired private DMaaPErrorMessages errorMessages; + + private boolean isOffsetTopicCreated=false; /** * This method is used to consume messages.Taking three parameter @@ -254,7 +256,10 @@ public class EventsRestService { public void pushEvents(@PathParam("topic") String topic, InputStream msg, @QueryParam("partitionKey") String partitionKey) throws CambriaApiException { log.info("Publishing message to topic " + topic); - + + if(!isOffsetTopicCreated){ + preCreateOffsetTopic(msg); + } try { eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null); } @@ -313,8 +318,14 @@ public class EventsRestService { @QueryParam("partitionKey") String partitionKey) throws CambriaApiException { // log.info("Publishing message with transaction id for topic " + topic // ); + try { + + if(!isOffsetTopicCreated){ + preCreateOffsetTopic(request.getInputStream()); + } + eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), partitionKey, Utils.getFormattedDate(new Date())); @@ -374,5 +385,18 @@ public class EventsRestService { return dmaapContext; } + + private void preCreateOffsetTopic(InputStream msg) { + + try { + eventsService.pushEvents(getDmaapContext(), "DUMMY_TOPIC", msg, null, null); + eventsService.getEvents(getDmaapContext(), "DUMMY_TOPIC", "CG1", "C1"); + isOffsetTopicCreated = true; + } catch (CambriaApiException | ConfigDbException | AccessDeniedException | TopicExistsException | IOException + | missingReqdSetting | UnavailableException e) { + log.error("Error while creating the dummy topic", e); + } + + } }
\ No newline at end of file |