Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add performance tracing to pipeline phases #1424

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 101 additions & 2 deletions distr/flecs.c
Original file line number Diff line number Diff line change
Expand Up @@ -25348,13 +25348,18 @@ struct ecs_pipeline_state_t {
ecs_vec_t ops; /* Pipeline schedule */
ecs_vec_t systems; /* Vector with system ids */

ecs_vec_t phase_offsets; /* Vector of offsets into phase_names (for perf tracing) */
ecs_vec_t phase_names; /* Vector with phase names (for perf tracing) */

ecs_entity_t last_system; /* Last system ran by pipeline */
ecs_id_record_t *idr_inactive; /* Cached record for quick inactive test */
int32_t match_count; /* Used to track of rebuild is necessary */
int32_t rebuild_count; /* Number of pipeline rebuilds */
ecs_iter_t *iters; /* Iterator for worker(s) */
int32_t iter_count;

int8_t query_phase_term; /* Pipeline query phase term if != 0 (for perf tracing) */

/* Members for continuing pipeline iteration after pipeline rebuild */
ecs_pipeline_op_t *cur_op; /* Current pipeline op */
int32_t cur_i; /* Index in current result */
Expand Down Expand Up @@ -52683,6 +52688,17 @@ static void flecs_pipeline_free(
ecs_allocator_t *a = &world->allocator;
ecs_vec_fini_t(a, &p->ops, ecs_pipeline_op_t);
ecs_vec_fini_t(a, &p->systems, ecs_entity_t);

#ifdef FLECS_PERF_TRACE
ecs_vec_fini_t(a, &p->phase_offsets, int32_t);
int32_t i, count = ecs_vec_count(&p->phase_names);
const char** phase_names = ecs_vec_first_t(&p->phase_names, const char*);
for (i = 0; i < count; i ++) {
ecs_os_free(ECS_CONST_CAST(char*, phase_names[i]));
}
ecs_vec_fini_t(a, &p->phase_names, const char*);
#endif

ecs_os_free(p->iters);
ecs_query_fini(p->query);
ecs_os_free(p);
Expand Down Expand Up @@ -52943,6 +52959,14 @@ bool flecs_pipeline_build(
ecs_vec_reset_t(a, &pq->ops, ecs_pipeline_op_t);
ecs_vec_reset_t(a, &pq->systems, ecs_entity_t);

#ifdef FLECS_PERF_TRACE
ecs_vec_reset_t(a, &pq->phase_offsets, int32_t);
ecs_vec_reset_t(a, &pq->phase_names, const char*);
/* Local map for building up phase_offsets & phase_names */
ecs_map_t phase_offset_map;
ecs_map_init(&phase_offset_map, a);
#endif

bool multi_threaded = false;
bool immediate = false;
bool first = true;
Expand All @@ -52953,6 +52977,25 @@ bool flecs_pipeline_build(
bool is_active = ecs_table_get_type_index(
world, it.table, EcsEmpty) == -1;

#ifdef FLECS_PERF_TRACE
int32_t phase_offset = 0;
if (pq->query_phase_term) {
ecs_entity_t phase = ecs_field_src(&it, pq->query_phase_term);

ecs_map_val_t* phase_offset_p = ecs_map_get(&phase_offset_map, phase);
if (!phase_offset_p) {
/* New phase, record its name into the name vector */
phase_offset = ecs_vec_count(&pq->phase_names);
const char* phase_name = ecs_get_path(world, phase);

ecs_map_insert(&phase_offset_map, phase, (uint64_t)(phase_offset));
ecs_vec_append_t(a, &pq->phase_names, const char*)[0] = phase_name;
} else {
phase_offset = (int32_t)*phase_offset_p;
}
}
#endif

int32_t i;
for (i = 0; i < it.count; i ++) {
flecs_poly_assert(poly[i].poly, ecs_system_t);
Expand Down Expand Up @@ -53026,6 +53069,14 @@ bool flecs_pipeline_build(
if (is_active) {
ecs_vec_append_t(a, &pq->systems, ecs_entity_t)[0] =
it.entities[i];

#ifdef FLECS_PERF_TRACE
if (pq->query_phase_term) {
/* Each system in the systems vector has a corresponding phase offset */
ecs_vec_append_t(a, &pq->phase_offsets, int32_t)[0] = phase_offset;
}
#endif

if (!op->count) {
op->multi_threaded = multi_threaded;
op->immediate = immediate;
Expand All @@ -53042,6 +53093,10 @@ bool flecs_pipeline_build(
ecs_map_fini(&ws.ids);
ecs_map_fini(&ws.wildcard_ids);

#ifdef FLECS_PERF_TRACE
ecs_map_fini(&phase_offset_map);
#endif

op = ecs_vec_first_t(&pq->ops, ecs_pipeline_op_t);

if (!op) {
Expand Down Expand Up @@ -53219,7 +53274,29 @@ int32_t flecs_run_pipeline_ops(
ecs_entity_t* systems = ecs_vec_first_t(&pq->systems, ecs_entity_t);
int32_t ran_since_merge = i - op->offset;

#ifdef FLECS_PERF_TRACE
int32_t* phase_offsets = ecs_vec_first_t(&pq->phase_offsets, int32_t);
const char** phase_names = ecs_vec_first_t(&pq->phase_names, const char*);
#endif

for (; i < count; i++) {
#ifdef FLECS_PERF_TRACE
if (pq->query_phase_term) {
if (i > 0) {
int32_t phase = phase_offsets[i];
int32_t last_phase = phase_offsets[i - 1];

if (phase != last_phase) {
/* Close the span of the previous phase and open the current one.
* The first/last phases are handled in flecs_run_pipeline because
* this function may run multiple times during one pipeline. */
ecs_os_perf_trace_pop(phase_names[last_phase]);
ecs_os_perf_trace_push(phase_names[phase]);
}
}
}
#endif

ecs_entity_t system = systems[i];
const EcsPoly* poly = ecs_get_pair(world, system, EcsPoly, EcsSystem);
flecs_poly_assert(poly->poly, ecs_system_t);
Expand Down Expand Up @@ -53278,6 +53355,17 @@ void flecs_run_pipeline(
// Update the pipeline before waking the workers.
flecs_pipeline_update(world, pq, true);

#ifdef FLECS_PERF_TRACE
int32_t* phase_offsets = ecs_vec_first_t(&pq->phase_offsets, int32_t);
const char** phase_names = ecs_vec_first_t(&pq->phase_names, const char*);

if (pq->query_phase_term && phase_offsets && phase_names) {
/* Open the span of the first phase in the pipeline.
* Intermediate phases are handled in flecs_run_pipeline_ops. */
ecs_os_perf_trace_push(phase_names[phase_offsets[0]]);
}
#endif

// If there are no operations to execute in the pipeline bail early,
// no need to wake the workers since they have nothing to do.
while (pq->cur_op != NULL) {
Expand Down Expand Up @@ -53344,6 +53432,14 @@ void flecs_run_pipeline(

flecs_pipeline_update(world, pq, false);
}

#ifdef FLECS_PERF_TRACE
if (pq->query_phase_term && phase_offsets && phase_names) {
int32_t last_phase_offset = ecs_vec_last_t(&pq->phase_offsets, int32_t)[0];
/* Close the span of the first phase in the pipeline */
ecs_os_perf_trace_pop(phase_names[last_phase_offset]);
}
#endif
}

static
Expand Down Expand Up @@ -53377,7 +53473,8 @@ void flecs_run_startup_systems(
{ .id = EcsDisabled, .src.id = EcsUp, .trav = EcsChildOf, .oper = EcsNot }
},
.order_by_callback = flecs_entity_compare
}
},
.query_phase_term = 1,
});
ecs_log_pop_2();

Expand Down Expand Up @@ -53507,6 +53604,7 @@ ecs_entity_t ecs_pipeline_init(

ecs_pipeline_state_t *pq = ecs_os_calloc_t(ecs_pipeline_state_t);
pq->query = query;
pq->query_phase_term = desc->query_phase_term;
pq->match_count = -1;
pq->idr_inactive = flecs_id_record_ensure(world, EcsEmpty);
ecs_set(world, result, EcsPipeline, { pq });
Expand Down Expand Up @@ -53604,7 +53702,8 @@ void FlecsPipelineImport(
{ .id = EcsDisabled, .src.id = EcsUp, .trav = EcsChildOf, .oper = EcsNot }
},
.order_by_callback = flecs_entity_compare
}
},
.query_phase_term = 1,
});

/* Cleanup thread administration when world is destroyed */
Expand Down
6 changes: 6 additions & 0 deletions distr/flecs.h
Original file line number Diff line number Diff line change
Expand Up @@ -11921,6 +11921,12 @@ typedef struct ecs_pipeline_desc_t {
* pipeline query works.
*/
ecs_query_desc_t query;

/** The pipeline query's phase term (optional).
* If non-zero, the source entity of this term will be considered a "phase"
* for the purposes of performance tracing.
*/
int8_t query_phase_term;
} ecs_pipeline_desc_t;

/** Create a custom pipeline.
Expand Down
6 changes: 6 additions & 0 deletions include/flecs/addons/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ typedef struct ecs_pipeline_desc_t {
* pipeline query works.
*/
ecs_query_desc_t query;

/** The pipeline query's phase term (optional).
* If non-zero, the source entity of this term will be considered a "phase"
* for the purposes of performance tracing.
*/
int8_t query_phase_term;
} ecs_pipeline_desc_t;

/** Create a custom pipeline.
Expand Down
98 changes: 96 additions & 2 deletions src/addons/pipeline/pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ static void flecs_pipeline_free(
ecs_allocator_t *a = &world->allocator;
ecs_vec_fini_t(a, &p->ops, ecs_pipeline_op_t);
ecs_vec_fini_t(a, &p->systems, ecs_entity_t);

#ifdef FLECS_PERF_TRACE
ecs_vec_fini_t(a, &p->phase_offsets, int32_t);
int32_t i, count = ecs_vec_count(&p->phase_names);
const char** phase_names = ecs_vec_first_t(&p->phase_names, const char*);
for (i = 0; i < count; i ++) {
ecs_os_free(ECS_CONST_CAST(char*, phase_names[i]));
}
ecs_vec_fini_t(a, &p->phase_names, const char*);
#endif

ecs_os_free(p->iters);
ecs_query_fini(p->query);
ecs_os_free(p);
Expand Down Expand Up @@ -277,6 +288,14 @@ bool flecs_pipeline_build(
ecs_vec_reset_t(a, &pq->ops, ecs_pipeline_op_t);
ecs_vec_reset_t(a, &pq->systems, ecs_entity_t);

#ifdef FLECS_PERF_TRACE
ecs_vec_reset_t(a, &pq->phase_offsets, int32_t);
ecs_vec_reset_t(a, &pq->phase_names, const char*);
/* Local map for building up phase_offsets & phase_names */
ecs_map_t phase_offset_map;
ecs_map_init(&phase_offset_map, a);
#endif

bool multi_threaded = false;
bool immediate = false;
bool first = true;
Expand All @@ -287,6 +306,25 @@ bool flecs_pipeline_build(
bool is_active = ecs_table_get_type_index(
world, it.table, EcsEmpty) == -1;

#ifdef FLECS_PERF_TRACE
int32_t phase_offset = 0;
if (pq->query_phase_term) {
ecs_entity_t phase = ecs_field_src(&it, pq->query_phase_term);

ecs_map_val_t* phase_offset_p = ecs_map_get(&phase_offset_map, phase);
if (!phase_offset_p) {
/* New phase, record its name into the name vector */
phase_offset = ecs_vec_count(&pq->phase_names);
const char* phase_name = ecs_get_path(world, phase);

ecs_map_insert(&phase_offset_map, phase, (uint64_t)(phase_offset));
ecs_vec_append_t(a, &pq->phase_names, const char*)[0] = phase_name;
} else {
phase_offset = (int32_t)*phase_offset_p;
}
}
#endif

int32_t i;
for (i = 0; i < it.count; i ++) {
flecs_poly_assert(poly[i].poly, ecs_system_t);
Expand Down Expand Up @@ -360,6 +398,14 @@ bool flecs_pipeline_build(
if (is_active) {
ecs_vec_append_t(a, &pq->systems, ecs_entity_t)[0] =
it.entities[i];

#ifdef FLECS_PERF_TRACE
if (pq->query_phase_term) {
/* Each system in the systems vector has a corresponding phase offset */
ecs_vec_append_t(a, &pq->phase_offsets, int32_t)[0] = phase_offset;
}
#endif

if (!op->count) {
op->multi_threaded = multi_threaded;
op->immediate = immediate;
Expand All @@ -376,6 +422,10 @@ bool flecs_pipeline_build(
ecs_map_fini(&ws.ids);
ecs_map_fini(&ws.wildcard_ids);

#ifdef FLECS_PERF_TRACE
ecs_map_fini(&phase_offset_map);
#endif

op = ecs_vec_first_t(&pq->ops, ecs_pipeline_op_t);

if (!op) {
Expand Down Expand Up @@ -553,7 +603,29 @@ int32_t flecs_run_pipeline_ops(
ecs_entity_t* systems = ecs_vec_first_t(&pq->systems, ecs_entity_t);
int32_t ran_since_merge = i - op->offset;

#ifdef FLECS_PERF_TRACE
int32_t* phase_offsets = ecs_vec_first_t(&pq->phase_offsets, int32_t);
const char** phase_names = ecs_vec_first_t(&pq->phase_names, const char*);
#endif

for (; i < count; i++) {
#ifdef FLECS_PERF_TRACE
if (pq->query_phase_term) {
if (i > 0) {
int32_t phase = phase_offsets[i];
int32_t last_phase = phase_offsets[i - 1];

if (phase != last_phase) {
/* Close the span of the previous phase and open the current one.
* The first/last phases are handled in flecs_run_pipeline because
* this function may run multiple times during one pipeline. */
ecs_os_perf_trace_pop(phase_names[last_phase]);
ecs_os_perf_trace_push(phase_names[phase]);
}
}
}
#endif

ecs_entity_t system = systems[i];
const EcsPoly* poly = ecs_get_pair(world, system, EcsPoly, EcsSystem);
flecs_poly_assert(poly->poly, ecs_system_t);
Expand Down Expand Up @@ -612,6 +684,17 @@ void flecs_run_pipeline(
// Update the pipeline before waking the workers.
flecs_pipeline_update(world, pq, true);

#ifdef FLECS_PERF_TRACE
int32_t* phase_offsets = ecs_vec_first_t(&pq->phase_offsets, int32_t);
const char** phase_names = ecs_vec_first_t(&pq->phase_names, const char*);

if (pq->query_phase_term && phase_offsets && phase_names) {
/* Open the span of the first phase in the pipeline.
* Intermediate phases are handled in flecs_run_pipeline_ops. */
ecs_os_perf_trace_push(phase_names[phase_offsets[0]]);
}
#endif

// If there are no operations to execute in the pipeline bail early,
// no need to wake the workers since they have nothing to do.
while (pq->cur_op != NULL) {
Expand Down Expand Up @@ -678,6 +761,14 @@ void flecs_run_pipeline(

flecs_pipeline_update(world, pq, false);
}

#ifdef FLECS_PERF_TRACE
if (pq->query_phase_term && phase_offsets && phase_names) {
int32_t last_phase_offset = ecs_vec_last_t(&pq->phase_offsets, int32_t)[0];
/* Close the span of the first phase in the pipeline */
ecs_os_perf_trace_pop(phase_names[last_phase_offset]);
}
#endif
}

static
Expand Down Expand Up @@ -711,7 +802,8 @@ void flecs_run_startup_systems(
{ .id = EcsDisabled, .src.id = EcsUp, .trav = EcsChildOf, .oper = EcsNot }
},
.order_by_callback = flecs_entity_compare
}
},
.query_phase_term = 1,
});
ecs_log_pop_2();

Expand Down Expand Up @@ -841,6 +933,7 @@ ecs_entity_t ecs_pipeline_init(

ecs_pipeline_state_t *pq = ecs_os_calloc_t(ecs_pipeline_state_t);
pq->query = query;
pq->query_phase_term = desc->query_phase_term;
pq->match_count = -1;
pq->idr_inactive = flecs_id_record_ensure(world, EcsEmpty);
ecs_set(world, result, EcsPipeline, { pq });
Expand Down Expand Up @@ -938,7 +1031,8 @@ void FlecsPipelineImport(
{ .id = EcsDisabled, .src.id = EcsUp, .trav = EcsChildOf, .oper = EcsNot }
},
.order_by_callback = flecs_entity_compare
}
},
.query_phase_term = 1,
});

/* Cleanup thread administration when world is destroyed */
Expand Down
Loading