Skip to content

Commit

Permalink
Correct/go back trace overlap (#2203)
Browse files Browse the repository at this point in the history
* Fix the go back trace overlap.

* Fix some typo. And unnecessary copy by reference.

* Fix the note comment.

* Add a case to cover the issue that back tracker will overlap the dst, src in current step.

Co-authored-by: jude-zhu <51590253+jude-zhu@users.noreply.github.com>
  • Loading branch information
Shylock-Hg and jude-zhu authored Jul 7, 2020
1 parent cc91cbc commit 3132e39
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 84 deletions.
67 changes: 53 additions & 14 deletions src/graph/GoExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,11 +589,12 @@ void GoExecutor::stepOut() {
void GoExecutor::onStepOutResponse(RpcResponse &&rpcResp) {
joinResp(std::move(rpcResp));

// back trace each step
CHECK_GT(records_.size(), 0);
auto dsts = getDstIdsFromRespWithBackTrack(records_.back());
if (isFinalStep()) {
GO_EXIT();
} else {
CHECK_GT(records_.size(), 0);
auto dsts = getDstIdsFromResps(records_.end() - 1, records_.end());
starts_ = std::move(dsts);
if (starts_.empty()) {
GO_EXIT();
Expand Down Expand Up @@ -650,9 +651,6 @@ std::vector<VertexID> GoExecutor::getDstIdsFromResps(std::vector<RpcResponse>::i
for (const auto &edata : vdata.edge_data) {
for (const auto& edge : edata.get_edges()) {
auto dst = edge.get_dst();
if (!isFinalStep() && backTracker_ != nullptr) {
backTracker_->add(vdata.get_vertex_id(), dst);
}
set.emplace(dst);
}
}
Expand All @@ -662,6 +660,47 @@ std::vector<VertexID> GoExecutor::getDstIdsFromResps(std::vector<RpcResponse>::i
return std::vector<VertexID>(set.begin(), set.end());
}

std::vector<VertexID> GoExecutor::getDstIdsFromRespWithBackTrack(const RpcResponse &rpcResp) const {
// back trace in current step
// To avoid overlap in current step edges
// For example
// Dst , Src
// 6 , 1
// 7 , 6
// Will mistake lead to 7->6 if insert edge(dst, src) one by one
// So read all roots of current step first , then insert them
std::multimap<VertexID, VertexID> backTrace;
std::unordered_set<VertexID> set;
for (const auto &resp : rpcResp.responses()) {
auto *vertices = resp.get_vertices();
if (vertices == nullptr) {
continue;
}

for (const auto &vdata : *vertices) {
for (const auto &edata : vdata.edge_data) {
for (const auto& edge : edata.get_edges()) {
auto dst = edge.get_dst();
if (!isFinalStep() && backTracker_ != nullptr) {
auto range = backTracker_->get(vdata.get_vertex_id());
if (range.first == range.second) { // not found root
backTrace.emplace(dst, vdata.get_vertex_id());
}
for (auto trace = range.first; trace != range.second; ++trace) {
backTrace.emplace(dst, trace->second);
}
}
set.emplace(dst);
}
}
}
}
if (!isFinalStep() && backTracker_ != nullptr) {
backTracker_->inject(backTrace);
}
return std::vector<VertexID>(set.begin(), set.end());
}

void GoExecutor::finishExecution() {
// MayBe we can do better.
std::vector<std::unique_ptr<YieldColumn>> yc;
Expand Down Expand Up @@ -1071,15 +1110,15 @@ bool GoExecutor::processFinalResult(Callback cb) const {
VLOG(1) << "Total vdata.edge_data size " << vdata.edge_data.size();
auto tagData = vdata.get_tag_data();
auto srcId = vdata.get_vertex_id();
const auto rootId = getRoot(srcId, recordIn);
auto inputRows = index_->rowsOfVid(rootId);
const auto roots = getRoots(srcId, recordIn);
auto inputRows = index_->rowsOfVids(roots);
// Here if join the input we extend the input rows coresponding to current vertex;
// Or just loop once as previous that not join anything,
// in fact result what in responses.
bool notJoinOnce = false;
for (auto inputRow = inputRows.first;
!joinInput || inputRow != inputRows.second;
joinInput ? ++inputRow : inputRow) {
for (auto inputRow = inputRows.begin();
!joinInput || inputRow != inputRows.end();
joinInput ? ++inputRow : inputRow) {
if (!joinInput) {
if (notJoinOnce) {
break;
Expand Down Expand Up @@ -1176,11 +1215,11 @@ bool GoExecutor::processFinalResult(Callback cb) const {
return ret.value();
};
getters.getVariableProp = [inputRow, this] (const std::string &prop) {
return index_->getColumnWithRow(inputRow->second, prop);
return index_->getColumnWithRow(*inputRow, prop);
};

getters.getInputProp = [inputRow, this] (const std::string &prop) {
return index_->getColumnWithRow(inputRow->second, prop);
return index_->getColumnWithRow(*inputRow, prop);
};

std::unique_ptr<RowReader> reader;
Expand Down Expand Up @@ -1269,8 +1308,8 @@ bool GoExecutor::processFinalResult(Callback cb) const {
}
} // for edges
} // for edata
} // for `vdata'
} // extend input rows
} // for input rows
} // for vdata
} // for `resp'
}
return true;
Expand Down
46 changes: 25 additions & 21 deletions src/graph/GoExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,6 @@ class GoExecutor final : public TraverseExecutor {
return false;
}

VertexID getRoot(VertexID srcId, std::size_t record) const {
CHECK_GT(record, 0);
VertexID rootId = srcId;
if (record == 1) {
return rootId;
}
rootId = DCHECK_NOTNULL(backTracker_)->get(srcId);
return rootId;
}

/**
* To obtain the source ids from various places,
* such as the literal id list, inputs from the pipeline or results of variable.
Expand Down Expand Up @@ -141,6 +131,8 @@ class GoExecutor final : public TraverseExecutor {
std::vector<VertexID> getDstIdsFromResps(std::vector<RpcResponse>::iterator begin,
std::vector<RpcResponse>::iterator end) const;

std::vector<VertexID> getDstIdsFromRespWithBackTrack(const RpcResponse &rpcResp) const;

/**
* get the edgeName when over all edges
*/
Expand Down Expand Up @@ -196,23 +188,20 @@ class GoExecutor final : public TraverseExecutor {

class VertexBackTracker final {
public:
void add(VertexID src, VertexID dst) {
VertexID value = src;
auto iter = mapping_.find(src);
if (iter != mapping_.end()) {
value = iter->second;
void inject(const std::multimap<VertexID, VertexID> &backTrace) {
// TODO(shylock) c++17 merge directly
for (const auto iter : backTrace) {
mapping_.emplace(iter.first, iter.second);
}
mapping_[dst] = value;
}

VertexID get(VertexID id) {
auto iter = mapping_.find(id);
DCHECK(iter != mapping_.end());
return iter->second;
auto get(VertexID id) const {
auto range = mapping_.equal_range(id);
return range;
}

private:
std::unordered_map<VertexID, VertexID> mapping_;
std::multimap<VertexID, VertexID> mapping_;
};

enum FromType {
Expand All @@ -224,6 +213,21 @@ class GoExecutor final : public TraverseExecutor {
// Join the RPC response to previous data
void joinResp(RpcResponse &&resp);

std::vector<VertexID> getRoots(VertexID srcId, std::size_t record) const {
CHECK_GT(record, 0);
std::vector<VertexID> ids;
if (record == 1) {
ids.emplace_back(srcId);
return ids;
}
const auto range = DCHECK_NOTNULL(backTracker_)->get(srcId);
for (auto i = range.first; i != range.second; ++i) {
ids.emplace_back(i->second);
}
return ids;
}


private:
GoSentence *sentence_{nullptr};
FromType fromType_{kInstantExpr};
Expand Down
11 changes: 11 additions & 0 deletions src/graph/InterimResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ class InterimResult final {
return vidToRowIndex_.equal_range(id);
}

std::vector<uint32_t> rowsOfVids(const std::vector<VertexID> &ids) {
std::vector<uint32_t> rows;
for (const auto vid : ids) {
const auto range = rowsOfVid(vid);
for (auto i = range.first; i != range.second; ++i) {
rows.emplace_back(i->second);
}
}
return rows;
}

private:
friend class InterimResult;
using Row = std::vector<VariantType>;
Expand Down
Loading

0 comments on commit 3132e39

Please sign in to comment.