Skip to content

Commit

Permalink
feat: use generic event publisher for mysql consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
rgomezcasas committed Nov 27, 2023
1 parent c577764 commit 8e3ee2d
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 121 deletions.
91 changes: 46 additions & 45 deletions src/mooc/main/resources/database/mooc.sql
Original file line number Diff line number Diff line change
@@ -1,62 +1,63 @@
CREATE TABLE IF NOT EXISTS courses (
id CHAR(36) NOT NULL,
name VARCHAR(255) NOT NULL,
duration VARCHAR(255) NOT NULL,
PRIMARY KEY (id)
id CHAR(36) NOT NULL,
name VARCHAR(255) NOT NULL,
duration VARCHAR(255) NOT NULL,
PRIMARY KEY (id)
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;

CREATE TABLE IF NOT EXISTS courses_counter (
id CHAR(36) NOT NULL,
total INT NOT NULL,
existing_courses JSON NOT NULL,
PRIMARY KEY (id)
id CHAR(36) NOT NULL,
total INT NOT NULL,
existing_courses JSON NOT NULL,
PRIMARY KEY (id)
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;
INSERT IGNORE INTO courses_counter (id, total, existing_courses) VALUES ('efbaff16-8fcd-4689-9fc9-ec545d641c46', 0, '[]');
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;
INSERT IGNORE INTO courses_counter (id, total, existing_courses)
VALUES ('efbaff16-8fcd-4689-9fc9-ec545d641c46', 0, '[]');

CREATE TABLE IF NOT EXISTS steps (
id CHAR(36) NOT NULL,
title VARCHAR(155) NOT NULL,
PRIMARY KEY (id)
id CHAR(36) NOT NULL,
title VARCHAR(155) NOT NULL,
PRIMARY KEY (id)
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;

CREATE TABLE IF NOT EXISTS steps_challenges (
id CHAR(36) NOT NULL,
statement TEXT NOT NULL,
PRIMARY KEY (id),
CONSTRAINT fk_steps_challenges__step_id FOREIGN KEY (id) REFERENCES steps(id)
id CHAR(36) NOT NULL,
statement TEXT NOT NULL,
PRIMARY KEY (id),
CONSTRAINT fk_steps_challenges__step_id FOREIGN KEY (id) REFERENCES steps(id)
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;

CREATE TABLE IF NOT EXISTS steps_videos (
id CHAR(36) NOT NULL,
url VARCHAR(255) NOT NULL,
text TEXT NOT NULL,
PRIMARY KEY (id),
CONSTRAINT fk_steps_video__step_id FOREIGN KEY (id) REFERENCES steps(id)
id CHAR(36) NOT NULL,
url VARCHAR(255) NOT NULL,
text TEXT NOT NULL,
PRIMARY KEY (id),
CONSTRAINT fk_steps_video__step_id FOREIGN KEY (id) REFERENCES steps(id)
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;

CREATE TABLE IF NOT EXISTS domain_events (
id CHAR(36) NOT NULL,
aggregate_id CHAR(36) NOT NULL,
name VARCHAR(255) NOT NULL,
body JSON NOT NULL,
occurred_on TIMESTAMP NOT NULL,
PRIMARY KEY (id)
id CHAR(36) NOT NULL,
aggregate_id CHAR(36) NOT NULL,
name VARCHAR(255) NOT NULL,
body JSON NOT NULL,
occurred_on TIMESTAMP NOT NULL,
PRIMARY KEY (id)
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import org.springframework.beans.factory.annotation.Qualifier;
import tv.codely.shared.domain.Utils;
import tv.codely.shared.domain.bus.event.DomainEvent;
import tv.codely.shared.domain.bus.event.EventBus;
import tv.codely.shared.infrastructure.bus.event.DomainEventsInformation;
import tv.codely.shared.infrastructure.bus.event.spring.SpringApplicationEventBus;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand All @@ -17,79 +17,80 @@
import java.util.List;

public class MySqlDomainEventsConsumer {
private final SessionFactory sessionFactory;
private final DomainEventsInformation domainEventsInformation;
private final SpringApplicationEventBus bus;
private final Integer CHUNKS = 200;
private Boolean shouldStop = false;

public MySqlDomainEventsConsumer(
@Qualifier("mooc-session_factory") SessionFactory sessionFactory,
DomainEventsInformation domainEventsInformation,
SpringApplicationEventBus bus
) {
this.sessionFactory = sessionFactory;
this.domainEventsInformation = domainEventsInformation;
this.bus = bus;
}

@Transactional
public void consume() {
while (!shouldStop) {
NativeQuery query = sessionFactory.getCurrentSession().createNativeQuery(
"SELECT * FROM domain_events ORDER BY occurred_on ASC LIMIT :chunk"
);

query.setParameter("chunk", CHUNKS);

List<Object[]> events = query.list();

try {
for (Object[] event : events) {
executeSubscribers(
(String) event[0],
(String) event[1],
(String) event[2],
(String) event[3],
(Timestamp) event[4]
);
}
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
e.printStackTrace();
}

sessionFactory.getCurrentSession().clear();
}
}

public void stop() {
shouldStop = true;
}

private void executeSubscribers(
String id, String aggregateId, String eventName, String body, Timestamp occurredOn
) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {

Class<? extends DomainEvent> domainEventClass = domainEventsInformation.forName(eventName);

DomainEvent nullInstance = domainEventClass.getConstructor().newInstance();

Method fromPrimitivesMethod = domainEventClass.getMethod(
"fromPrimitives",
String.class,
HashMap.class,
String.class,
String.class
);

Object domainEvent = fromPrimitivesMethod.invoke(
nullInstance,
aggregateId,
Utils.jsonDecode(body),
id,
Utils.dateToString(occurredOn)
);

bus.publish(Collections.singletonList((DomainEvent) domainEvent));
}
private final SessionFactory sessionFactory;
private final DomainEventsInformation domainEventsInformation;
private final EventBus bus;
private final Integer CHUNKS = 200;
private Boolean shouldStop = false;

public MySqlDomainEventsConsumer(
@Qualifier("mooc-session_factory") SessionFactory sessionFactory,
DomainEventsInformation domainEventsInformation,
EventBus bus
) {
this.sessionFactory = sessionFactory;
this.domainEventsInformation = domainEventsInformation;
this.bus = bus;
}

@Transactional
public void consume() {
while (!shouldStop) {
NativeQuery query = sessionFactory.getCurrentSession().createNativeQuery(
"SELECT * FROM domain_events ORDER BY occurred_on ASC LIMIT :chunk"
);

query.setParameter("chunk", CHUNKS);

List<Object[]> events = query.list();

try {
for (Object[] event : events) {
executeSubscribers(
(String) event[0],
(String) event[1],
(String) event[2],
(String) event[3],
(Timestamp) event[4]
);
}
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException |
InstantiationException e) {
e.printStackTrace();
}

sessionFactory.getCurrentSession().clear();
}
}

public void stop() {
shouldStop = true;
}

private void executeSubscribers(
String id, String aggregateId, String eventName, String body, Timestamp occurredOn
) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {

Class<? extends DomainEvent> domainEventClass = domainEventsInformation.forName(eventName);

DomainEvent nullInstance = domainEventClass.getConstructor().newInstance();

Method fromPrimitivesMethod = domainEventClass.getMethod(
"fromPrimitives",
String.class,
HashMap.class,
String.class,
String.class
);

Object domainEvent = fromPrimitivesMethod.invoke(
nullInstance,
aggregateId,
Utils.jsonDecode(body),
id,
Utils.dateToString(occurredOn)
);

bus.publish(Collections.singletonList((DomainEvent) domainEvent));
}
}

0 comments on commit 8e3ee2d

Please sign in to comment.