Skip to content

Commit

Permalink
Merge pull request #285 from bcgov/feature/GRAD2-2334
Browse files Browse the repository at this point in the history
GRAD2-2334: task is completed
  • Loading branch information
kamal-mohammed authored Sep 19, 2023
2 parents ea817f8 + a171648 commit e672e29
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package ca.bc.gov.educ.api.dataconversion.scheduler;

import ca.bc.gov.educ.api.dataconversion.choreographer.ChoreographEventHandler;
import ca.bc.gov.educ.api.dataconversion.entity.Event;
import ca.bc.gov.educ.api.dataconversion.repository.EventRepository;
import ca.bc.gov.educ.api.dataconversion.util.EducGradDataConversionApiConstants;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.LockAssert;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

import static ca.bc.gov.educ.api.dataconversion.constant.EventStatus.DB_COMMITTED;

/**
Expand All @@ -27,33 +27,42 @@ public class JetStreamEventScheduler {

private final ChoreographEventHandler choreographer;

private final EducGradDataConversionApiConstants constants;

/**
* Instantiates a new Stan event scheduler.
*
* @param eventRepository the event repository
* @param choreographer the choreographer
*/
public JetStreamEventScheduler(final EventRepository eventRepository,
final ChoreographEventHandler choreographer) {
final ChoreographEventHandler choreographer,
final EducGradDataConversionApiConstants constants) {
this.eventRepository = eventRepository;
this.choreographer = choreographer;
this.constants = constants;
}

@Scheduled(cron = "${cron.scheduled.process.events.stan.run}") // minimum = every 5 minute
@SchedulerLock(name = "PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM", lockAtLeastFor = "${cron.scheduled.process.events.stan.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.events.stan.lockAtMostFor}")
public void findAndProcessEvents() {
log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getTraxToGradCronRun(), constants.getTraxToGradLockAtMostFor());
LockAssert.assertLocked();
final var results = this.eventRepository.findAllByEventStatusOrderByCreateDate(DB_COMMITTED.toString());
if (!results.isEmpty()) {
results.stream()
.filter(el -> el.getUpdateDate().isBefore(LocalDateTime.now().minusMinutes(5)))
.forEach(el -> {
try {
choreographer.handleEvent(el);
} catch (final Exception ex) {
log.error("Exception while trying to handle message", ex);
}
});
int cnt = 0;
for (Event e : results) {
if (cnt++ >= constants.getTraxToGradProcessingThreshold()) {
log.info(" ==> Reached the processing threshold of {}", constants.getTraxToGradProcessingThreshold());
break;
}
try {
choreographer.handleEvent(e);
} catch (final Exception ex) {
log.error("Exception while trying to handle message", ex);
}
}
log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: processing is completed");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,17 @@ public class EducGradDataConversionApiConstants {
// Splunk LogHelper Enabled
@Value("${splunk.log-helper.enabled}")
private boolean splunkLogHelperEnabled;

// Scheduler: ongoing updates from TRAX to GRAD
@Value("${cron.scheduled.process.events.stan.run}")
private String traxToGradCronRun;

@Value("${cron.scheduled.process.events.stan.lockAtLeastFor}")
private String traxToGradLockAtLeastFor;

@Value("${cron.scheduled.process.events.stan.lockAtMostFor}")
private String traxToGradLockAtMostFor;

@Value("${cron.scheduled.process.events.stan.threshold}")
private int traxToGradProcessingThreshold;
}
1 change: 1 addition & 0 deletions api/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ cron:
run: ${CRON_SCHEDULED_PROCESS_EVENTS_STAN}
lockAtLeastFor: ${CRON_SCHEDULED_PROCESS_EVENTS_STAN_LOCK_AT_LEAST_FOR}
lockAtMostFor: ${CRON_SCHEDULED_PROCESS_EVENTS_STAN_LOCK_AT_MOST_FOR}
threshold: ${CRON_SCHEDULED_PROCESS_EVENTS_STAN_THRESHOLD}

#Endpoints
endpoint:
Expand Down
1 change: 1 addition & 0 deletions api/src/test/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ cron:
run: 0 0/5 * * * *
lockAtLeastFor: 800ms
lockAtMostFor: 900ms
threshold: 100

#Endpoints
endpoint:
Expand Down

0 comments on commit e672e29

Please sign in to comment.