Skip to content

Commit

Permalink
Materialized views for lineage events graph (#2891)
Browse files Browse the repository at this point in the history
* Lineage metrics table and left joined generated query.

* Removing log.

* Query formatting.

* Weekly metric query.

* Saving update for weekly and daily metrics.

* Making adjustments to storage to build materialized views from the `lineage_events` table

* Adding refresh job and daily mat view.

* Fixing table name.

* Fixing tests.

* Source code headers

* Deferring materialized view updates until the hour strikes.

* Fixing related to time intervals.

* Code review feedback addressed.

* Adding header.

---------

Co-authored-by: Willy Lulciuc <willy@datakin.com>
  • Loading branch information
phixMe and wslulciuc authored Sep 10, 2024
1 parent a586a89 commit d407e66
Show file tree
Hide file tree
Showing 13 changed files with 337 additions and 1 deletion.
4 changes: 4 additions & 0 deletions api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import marquez.common.Utils;
import marquez.db.DbMigration;
import marquez.jobs.DbRetentionJob;
import marquez.jobs.MaterializeViewRefresherJob;
import marquez.logging.DelegatingSqlLogger;
import marquez.logging.LabelledSqlLogger;
import marquez.logging.LoggingMdcFilter;
Expand Down Expand Up @@ -153,6 +154,9 @@ public void run(@NonNull MarquezConfig config, @NonNull Environment env) {
env.lifecycle().manage(new DbRetentionJob(jdbi, config.getDbRetention()));
}

// Add job to refresh materialized views.
env.lifecycle().manage(new MaterializeViewRefresherJob(jdbi));

// set namespaceFilter
ExclusionsConfig exclusions = config.getExclude();
Exclusions.use(exclusions);
Expand Down
13 changes: 12 additions & 1 deletion api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import marquez.api.OpenLineageResource;
import marquez.api.SearchResource;
import marquez.api.SourceResource;
import marquez.api.StatsResource;
import marquez.api.TagResource;
import marquez.api.exceptions.JdbiExceptionExceptionMapper;
import marquez.api.exceptions.JsonProcessingExceptionMapper;
Expand All @@ -40,6 +41,7 @@
import marquez.db.RunStateDao;
import marquez.db.SearchDao;
import marquez.db.SourceDao;
import marquez.db.StatsDao;
import marquez.db.TagDao;
import marquez.graphql.GraphqlSchemaBuilder;
import marquez.graphql.MarquezGraphqlServletBuilder;
Expand All @@ -57,6 +59,7 @@
import marquez.service.SearchService;
import marquez.service.ServiceFactory;
import marquez.service.SourceService;
import marquez.service.StatsService;
import marquez.service.TagService;
import marquez.service.models.Tag;
import org.jdbi.v3.core.Jdbi;
Expand All @@ -80,6 +83,7 @@ public final class MarquezContext {
@Getter private final LineageDao lineageDao;
@Getter private final ColumnLineageDao columnLineageDao;
@Getter private final SearchDao searchDao;
@Getter private final StatsDao statsDao;
@Getter private final List<RunTransitionListener> runTransitionListeners;

@Getter private final NamespaceService namespaceService;
Expand All @@ -92,6 +96,7 @@ public final class MarquezContext {
@Getter private final LineageService lineageService;
@Getter private final ColumnLineageService columnLineageService;
@Getter private final SearchService searchService;
@Getter private final StatsService statsService;
@Getter private final NamespaceResource namespaceResource;
@Getter private final SourceResource sourceResource;
@Getter private final DatasetResource datasetResource;
Expand All @@ -101,6 +106,7 @@ public final class MarquezContext {
@Getter private final OpenLineageResource openLineageResource;
@Getter private final marquez.api.v2beta.SearchResource v2BetasearchResource;
@Getter private final SearchResource searchResource;
@Getter private final StatsResource opsResource;
@Getter private final ImmutableList<Object> resources;
@Getter private final JdbiExceptionExceptionMapper jdbiException;
@Getter private final JsonProcessingExceptionMapper jsonException;
Expand Down Expand Up @@ -135,6 +141,7 @@ private MarquezContext(
this.lineageDao = jdbi.onDemand(LineageDao.class);
this.columnLineageDao = jdbi.onDemand(ColumnLineageDao.class);
this.searchDao = jdbi.onDemand(SearchDao.class);
this.statsDao = jdbi.onDemand(StatsDao.class);
this.runTransitionListeners = runTransitionListeners;

this.namespaceService = new NamespaceService(baseDao);
Expand All @@ -149,6 +156,7 @@ private MarquezContext(
this.lineageService = new LineageService(lineageDao, jobDao);
this.columnLineageService = new ColumnLineageService(columnLineageDao, datasetFieldDao);
this.searchService = new SearchService(searchConfig);
this.statsService = new StatsService(statsDao);
this.jdbiException = new JdbiExceptionExceptionMapper();
this.jsonException = new JsonProcessingExceptionMapper();
final ServiceFactory serviceFactory =
Expand All @@ -165,6 +173,7 @@ private MarquezContext(
.columnLineageService(columnLineageService)
.datasetFieldService(new DatasetFieldService(baseDao))
.datasetVersionService(new DatasetVersionService(baseDao))
.statsService(statsService)
.build();
this.namespaceResource = new NamespaceResource(serviceFactory);
this.sourceResource = new SourceResource(serviceFactory);
Expand All @@ -174,6 +183,7 @@ private MarquezContext(
this.tagResource = new TagResource(serviceFactory);
this.openLineageResource = new OpenLineageResource(serviceFactory, openLineageDao);
this.searchResource = new SearchResource(searchDao);
this.opsResource = new StatsResource(serviceFactory);
this.v2BetasearchResource = new marquez.api.v2beta.SearchResource(serviceFactory);

this.resources =
Expand All @@ -188,7 +198,8 @@ private MarquezContext(
jsonException,
openLineageResource,
searchResource,
v2BetasearchResource);
v2BetasearchResource,
opsResource);

final MarquezGraphqlServletBuilder servlet = new MarquezGraphqlServletBuilder();
this.graphqlServlet = servlet.getServlet(new GraphqlSchemaBuilder(jdbi));
Expand Down
48 changes: 48 additions & 0 deletions api/src/main/java/marquez/api/StatsResource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2018-2024 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.api;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.api.models.Period;
import marquez.service.ServiceFactory;
import marquez.service.StatsService;

@Slf4j
@Path("/api/v1/stats")
public class StatsResource {

private final StatsService StatsService;

public StatsResource(@NonNull final ServiceFactory serviceFactory) {
this.StatsService = serviceFactory.getStatsService();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Produces(APPLICATION_JSON)
@Path("/lineage-events")
public Response getStats(@QueryParam("period") Period period) {

return (Period.DAY.equals(period)
? Response.ok(StatsService.getLastDayLineageMetrics()).build()
: Period.WEEK.equals(period)
? Response.ok(StatsService.getLastWeekLineageMetrics()).build()
: Response.status(Response.Status.BAD_REQUEST).entity("Invalid period").build());
}
}
11 changes: 11 additions & 0 deletions api/src/main/java/marquez/api/models/Period.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright 2018-2024 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.api.models;

public enum Period {
DAY,
WEEK
}
8 changes: 8 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ private Columns() {}
public static final String EVENT = "event";
public static final String EVENT_TIME = "event_time";

/* METRICS ROW COLUMNS */
public static final String START_INTERVAL = "start_interval";
public static final String END_INTERVAL = "end_interval";
public static final String START = "start";
public static final String COMPLETE = "complete";
public static final String FAIL = "fail";
public static final String ABORT = "abort";

public static UUID uuidOrNull(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
return null;
Expand Down
72 changes: 72 additions & 0 deletions api/src/main/java/marquez/db/StatsDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2018-2024 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import java.util.List;
import marquez.db.mappers.LineageMetricRowMapper;
import marquez.db.models.LineageMetric;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;

@RegisterRowMapper(LineageMetricRowMapper.class)
public interface StatsDao extends BaseDao {

@SqlQuery(
"""
WITH hour_series AS (
SELECT
generate_series(
DATE_TRUNC('hour', NOW() - INTERVAL '1 day'),
DATE_TRUNC('hour', NOW() - INTERVAL '1 hour'),
'1 hour'
) AS start_interval
)
SELECT
hs.start_interval,
hs.start_interval + INTERVAL '1 hour' AS end_interval,
COALESCE(hourly_metrics.fail, 0) AS fail,
COALESCE(hourly_metrics.start, 0) AS start,
COALESCE(hourly_metrics.complete, 0) AS complete,
COALESCE(hourly_metrics.abort, 0) AS abort
FROM
hour_series hs
LEFT JOIN
lineage_events_by_type_hourly_view hourly_metrics
ON
hs.start_interval = hourly_metrics.start_interval
ORDER BY
hs.start_interval;
""")
List<LineageMetric> getLastDayMetrics();

@SqlQuery(
"""
WITH day_series AS (
SELECT
generate_series(
DATE_TRUNC('day', NOW() - INTERVAL '7 days'), -- Start 7 days ago
DATE_TRUNC('day', NOW()) - INTERVAL '1 day', -- End at the start of today
'1 day'
) AS start_interval
)
SELECT
ds.start_interval,
ds.start_interval + INTERVAL '1 day' AS end_interval,
COALESCE(mv.fail, 0) AS fail,
COALESCE(mv.start, 0) AS start,
COALESCE(mv.complete, 0) AS complete,
COALESCE(mv.abort, 0) AS abort
FROM
day_series ds
LEFT JOIN
lineage_events_by_type_daily_view mv
ON
ds.start_interval = mv.start_interval
ORDER BY
ds.start_interval;
""")
List<LineageMetric> getLastWeekMetrics();
}
31 changes: 31 additions & 0 deletions api/src/main/java/marquez/db/mappers/LineageMetricRowMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018-2024 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.intOrThrow;
import static marquez.db.Columns.timestampOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.NonNull;
import marquez.db.Columns;
import marquez.db.models.LineageMetric;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public final class LineageMetricRowMapper implements RowMapper<LineageMetric> {
@Override
public LineageMetric map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new LineageMetric(
timestampOrThrow(results, Columns.START_INTERVAL),
timestampOrThrow(results, Columns.END_INTERVAL),
intOrThrow(results, Columns.FAIL),
intOrThrow(results, Columns.START),
intOrThrow(results, Columns.COMPLETE),
intOrThrow(results, Columns.ABORT));
}
}
21 changes: 21 additions & 0 deletions api/src/main/java/marquez/db/models/LineageMetric.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2018-2024 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.models;

import java.time.Instant;
import lombok.Getter;
import lombok.NonNull;
import lombok.Value;

@Value
public class LineageMetric {
@Getter @NonNull Instant startInterval;
@Getter @NonNull Instant endInterval;
@Getter @NonNull Integer fail;
@Getter @NonNull Integer start;
@Getter @NonNull Integer complete;
@Getter @NonNull Integer abort;
}
77 changes: 77 additions & 0 deletions api/src/main/java/marquez/jobs/MaterializeViewRefresherJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2018-2024 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.jobs;

import com.google.common.util.concurrent.AbstractScheduledService;
import io.dropwizard.lifecycle.Managed;
import java.time.Duration;
import java.time.LocalTime;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Jdbi;

/** A job that refreshes materialized views on a fixed schedule in Marquez. */
@Slf4j
public class MaterializeViewRefresherJob extends AbstractScheduledService implements Managed {

private final int FREQUENCY = 60;
private final Scheduler fixedRateScheduler;
private final Jdbi jdbi;

public MaterializeViewRefresherJob(@NonNull final Jdbi jdbi) {
// Connection to database retention policy will be applied.
this.jdbi = jdbi;

// Define fixed schedule and delay until the hour strikes.
int MINUTES_IN_HOUR = 60;
LocalTime now = LocalTime.now();
int minutesRemaining =
MINUTES_IN_HOUR - now.getMinute(); // Get the remaining minutes in the hour
Duration duration = Duration.ofMinutes(minutesRemaining);
this.fixedRateScheduler =
Scheduler.newFixedRateSchedule(duration, Duration.ofMinutes(FREQUENCY));
}

@Override
protected Scheduler scheduler() {
return fixedRateScheduler;
}

@Override
public void start() throws Exception {
startAsync().awaitRunning();
log.info("Refreshing materialized views every '{}' mins.", FREQUENCY);
}

@Override
protected void runOneIteration() {
try {
log.info("Refreshing materialized views...");
jdbi.useHandle(
handle -> {
handle.execute("REFRESH MATERIALIZED VIEW lineage_events_by_type_hourly_view");
});
log.info("Materialized view `lineage_events_by_type_hourly_view` refreshed.");

if (java.time.LocalTime.now().getHour() == 0) {
jdbi.useHandle(
handle -> {
handle.execute("REFRESH MATERIALIZED VIEW lineage_events_by_type_daily_view");
});
log.info("Materialized view `lineage_events_by_type_daily_view` refreshed.");
}

} catch (Exception error) {
log.error("Failed to materialize views. Retrying on next run...", error);
}
}

@Override
public void stop() throws Exception {
log.info("Stopping materialized views job...");
stopAsync().awaitTerminated();
}
}
1 change: 1 addition & 0 deletions api/src/main/java/marquez/service/ServiceFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ public class ServiceFactory {
@NonNull LineageService lineageService;
@NonNull ColumnLineageService columnLineageService;
@NonNull SearchService searchService;
@NonNull StatsService statsService;
}
Loading

0 comments on commit d407e66

Please sign in to comment.