Skip to content

Commit

Permalink
Merge pull request #294 from bcgov/feature/GRAD2-2423
Browse files Browse the repository at this point in the history
GRAD2-2423: task is complete.
  • Loading branch information
kamal-mohammed authored Dec 8, 2023
2 parents e93da0f + 9555803 commit eb7ae00
Show file tree
Hide file tree
Showing 19 changed files with 39 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -24,19 +25,20 @@
@Component
@Slf4j
public class ChoreographEventHandler {
private final Executor singleTaskExecutor = new EnhancedQueueExecutor.Builder()
.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("single-task-executor-%d").build())
.setCorePoolSize(1).setMaximumPoolSize(1).build();
private final Executor eventExecutor;
private final Map<String, EventService> eventServiceMap;

public ChoreographEventHandler(final List<EventService> eventServices) {
this.eventServiceMap = new HashMap<>();
this.eventExecutor = new EnhancedQueueExecutor.Builder()
.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("multi-task-executor-%d").build())
.setCorePoolSize(10).setMaximumPoolSize(20).setKeepAliveTime(Duration.ofSeconds(60)).build();
eventServices.forEach(eventService -> this.eventServiceMap.put(eventService.getEventType(), eventService));
}

public void handleEvent(@NonNull final Event event) {
//only one thread will process all the request. since RDB won't handle concurrent requests.
this.singleTaskExecutor.execute(() -> {
this.eventExecutor.execute(() -> {
try {
switch (event.getEventType()) {
case "NEWSTUDENT":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@
@Slf4j
public class Subscriber {

private final Executor subscriberExecutor = new EnhancedQueueExecutor.Builder()
.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("jet-stream-subscriber-%d").build())
.setCorePoolSize(2).setMaximumPoolSize(2).setKeepAliveTime(Duration.ofMillis(1000)).build();
private final Executor subscriberExecutor;
private final EventHandlerDelegatorService eventHandlerDelegatorService;
private final Map<String, List<String>> streamTopicsMap = new HashMap<>(); // one stream can have multiple topics.
private final Connection natsConnection;
Expand All @@ -55,6 +53,9 @@ public Subscriber(final Connection natsConnection, final EventHandlerDelegatorSe
this.eventHandlerDelegatorService = eventHandlerDelegatorService;
this.natsConnection = natsConnection;
this.constants = constants;
this.subscriberExecutor = new EnhancedQueueExecutor.Builder()
.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("jet-stream-subscriber-%d").build())
.setCorePoolSize(10).setMaximumPoolSize(10).setKeepAliveTime(Duration.ofSeconds(60)).build();
this.initializeStreamTopicMap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

import static ca.bc.gov.educ.api.dataconversion.constant.EventStatus.DB_COMMITTED;

/**
Expand Down Expand Up @@ -46,12 +48,13 @@ public JetStreamEventScheduler(final EventRepository eventRepository,
@Scheduled(cron = "${cron.scheduled.process.events.stan.run}") // minimum = every 5 minute
@SchedulerLock(name = "PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM", lockAtLeastFor = "${cron.scheduled.process.events.stan.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.events.stan.lockAtMostFor}")
public void findAndProcessEvents() {
log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getTraxToGradCronRun(), constants.getTraxToGradLockAtMostFor());
LockAssert.assertLocked();
log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getTraxToGradCronRun(), constants.getTraxToGradLockAtMostFor());
final var results = this.eventRepository.findAllByEventStatusOrderByCreateDate(DB_COMMITTED.toString());
if (!results.isEmpty()) {
var filteredList = results.stream().filter(el -> el.getUpdateDate().isBefore(LocalDateTime.now().minusMinutes(5))).toList();
int cnt = 0;
for (Event e : results) {
for (Event e : filteredList) {
if (cnt++ >= constants.getTraxToGradProcessingThreshold()) {
log.info(" ==> Reached the processing threshold of {}", constants.getTraxToGradProcessingThreshold());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ public Event persistEventToDB(final ChoreographedEvent choreographedEvent) throw
.createDate(LocalDateTime.now())
.updateDate(LocalDateTime.now())
.build();
return eventRepository.save(event);
return this.eventRepository.save(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ public EventHandlerDelegatorService(final ChoreographedEventPersistenceService c
public void handleChoreographyEvent(@NonNull final ChoreographedEvent choreographedEvent, final Message message) throws IOException {
try {
final var persistedEvent = this.choreographedEventPersistenceService.persistEventToDB(choreographedEvent);
if (message.isJetStream()) {
message.ack(); // acknowledge to Jet Stream that api got the message and it is now in DB.
log.warn("acknowledged to Jet Stream...");
}
message.ack(); // acknowledge to Jet Stream that api got the message and it is now in DB.
log.warn("acknowledged to Jet Stream...");
this.choreographer.handleEvent(persistedEvent);
} catch (final BusinessException businessException) {
message.ack(); // acknowledge to Jet Stream that api got the message already...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public <T extends Object> void processEvent(T request, Event event) {
StudentGradDTO currentStudent = studentProcess.loadStudentData(studentAssessmentUpdate.getPen(), accessToken);
if (currentStudent != null) {
processAssessment(studentAssessmentUpdate, currentStudent, accessToken);
} else {
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public <T extends Object> void processEvent(T request, Event event) {
StudentGradDTO currentStudent = studentProcess.loadStudentData(studentCourseUpdate.getPen(), accessToken);
if (currentStudent != null) {
processCourse(studentCourseUpdate, currentStudent, accessToken);
} else {
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public <T extends Object> void processEvent(T request, Event event) {
StudentGradDTO currentStudent = studentProcess.loadStudentData(updateDemog.getPen(), accessToken);
if (currentStudent != null) {
processStudentDemographics(updateDemog, currentStudent, accessToken);
} else {
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public <T extends Object> void processEvent(T request, Event event) {
StudentGradDTO currentStudent = studentProcess.loadStudentData(frenchImmersionUpdate.getPen(), accessToken);
if (currentStudent != null) {
processFrenchImmersion(frenchImmersionUpdate, currentStudent, accessToken);
} else {
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public <T extends Object> void processEvent(T request, Event event) {
StudentGradDTO currentStudent = studentProcess.loadStudentData(updateGrad.getPen(), accessToken);
if (currentStudent != null) {
processStudent(updateGrad, currentStudent, accessToken);
} else {
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public <T extends Object> void processEvent(T request, Event event) {
StudentGradDTO currentStudent = studentProcess.loadStudentData(studentStatusUpdate.getPen(), accessToken);
if (currentStudent != null) {
processStudent(studentStatusUpdate, currentStudent, accessToken);
} else {
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public <T extends Object> void processEvent(T request, Event event) {
StudentGradDTO currentStudent = studentProcess.loadStudentData(xprogram.getPen(), accessToken);
if (currentStudent != null) {
processOptionalAndCareerPrograms(xprogram, currentStudent, accessToken);
} else {
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,6 @@ public void testProcessStudentAssessment_whenException_isThrown_returnsAPICallEr
studentAssessmentUpdateEventService.processEvent(traxStudentUpdate, event);

assertThat(event).isNotNull();
assertThat(event.getEventStatus()).isEqualTo(EventStatus.PROCESSED.name());
assertThat(event.getEventStatus()).isEqualTo(EventStatus.DB_COMMITTED.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,6 @@ public void testProcessStudentCourse_whenException_isThrown_returnsAPICallError(
studentCourseUpdateEventService.processEvent(traxStudentUpdate, event);

assertThat(event).isNotNull();
assertThat(event.getEventStatus()).isEqualTo(EventStatus.PROCESSED.name());
assertThat(event.getEventStatus()).isEqualTo(EventStatus.DB_COMMITTED.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,6 @@ public void testProcessStudentDemographics_whenException_isThrown_returnsAPICall
studentDemographicsUpdateEventService.processEvent(traxDemog, event);

assertThat(event).isNotNull();
assertThat(event.getEventStatus()).isEqualTo(EventStatus.PROCESSED.name());
assertThat(event.getEventStatus()).isEqualTo(EventStatus.DB_COMMITTED.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void testProcessStudentForFrenchImmersion_whenException_isThrown_returnsA
studentFrenchImmersionEventService.processEvent(traxFrenchImmersion, event);

assertThat(event).isNotNull();
assertThat(event.getEventStatus()).isEqualTo(EventStatus.PROCESSED.name());
assertThat(event.getEventStatus()).isEqualTo(EventStatus.DB_COMMITTED.name());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ public void testProcessStudentForGrad2018ENProgram_whenException_isThrown_return
studentGraduationUpdateEventService.processEvent(traxStudentUpdate, event);

assertThat(event).isNotNull();
assertThat(event.getEventStatus()).isEqualTo(EventStatus.PROCESSED.name());
assertThat(event.getEventStatus()).isEqualTo(EventStatus.DB_COMMITTED.name());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,6 @@ public void testProcessStudentStatus_whenException_isThrown_returnsAPICallError(
studentStatusUpdateEventService.processEvent(traxStudentUpdate, event);

assertThat(event).isNotNull();
assertThat(event.getEventStatus()).isEqualTo(EventStatus.PROCESSED.name());
assertThat(event.getEventStatus()).isEqualTo(EventStatus.DB_COMMITTED.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,6 @@ public void testProcessStudentForXPrograms_whenException_isThrown_returnsAPICall
studentXProgramEventService.processEvent(traxXProgram, event);

assertThat(event).isNotNull();
assertThat(event.getEventStatus()).isEqualTo(EventStatus.PROCESSED.name());
assertThat(event.getEventStatus()).isEqualTo(EventStatus.DB_COMMITTED.name());
}
}

0 comments on commit eb7ae00

Please sign in to comment.