Skip to content

Commit

Permalink
Cherry pick 3.2.1 (#4626)
Browse files Browse the repository at this point in the history
* fix lookup (#4552)

fix

Co-authored-by: jimingquan <mingquan.ji@vesoft.com>
Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* fix split brain in raft (#4479)

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* fix invalid filter in GetProp make storage crashed (#4568)

Co-authored-by: haowen <19355821+wenhaocs@users.noreply.github.com>

* fix scan vertex/edge do not handle ttl (#4578)

* fix scan vertex/edge do not handle ttl

* use ErrorCode to unify community version and end version

* Fix #1212. Return FoldConstantExprVisitor, if status_ already failed due to found syantax errors. (#4607)

Co-authored-by: jie.wang <38901892+jievince@users.noreply.github.com>

* Avoid fatal when expression illegal. (#4618)

* Fix concurrent exception related to multi-match statement (#4605)

* fix filter executor

* Fix concurrency exception of multi-match statements

fix iterator

fix

small delete

small delete

skip iterator type handle for concurrency

small delete

fix scan edges

small delete

small delete

fix

small delete

small change

small change

fix ut

small fix

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* Prune properties(#4523)

* fix conflict

* extract attribute from properties function (#4604)

* extract attribute from properties function

* fix error

* fix subscript error

* add test case

* process scanEdges

* fix test error

* add unwind & check vidType when executing not validate (#4456)

* Update AppendVerticesExecutor.cpp

fix conflict

* Update AppendVerticesExecutor.cpp

* Replace obsolete RocksDB API (#4395)

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* Update PrunePropertiesRule.feature

* remove useless dc (#4533)

* Update PrunePropertiesRule.feature

* fix test error

Co-authored-by: kyle.cao <kyle.cao@vesoft.com>
Co-authored-by: jimingquan <mingquan.ji@vesoft.com>
Co-authored-by: liwenhui-soul <38217397+liwenhui-soul@users.noreply.github.com>
Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
Co-authored-by: haowen <19355821+wenhaocs@users.noreply.github.com>
Co-authored-by: Cheng Xuntao <7731943+xtcyclist@users.noreply.github.com>
Co-authored-by: jie.wang <38901892+jievince@users.noreply.github.com>
Co-authored-by: shylock <33566796+Shylock-Hg@users.noreply.github.com>
Co-authored-by: Qiaolin Yu <90088090+Qiaolin-Yu@users.noreply.github.com>
  • Loading branch information
10 people authored Sep 13, 2022
1 parent ef6d6a0 commit bb2e684
Show file tree
Hide file tree
Showing 79 changed files with 1,632 additions and 722 deletions.
2 changes: 1 addition & 1 deletion src/common/expression/LabelAttributeExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class LabelAttributeExpression final : public Expression {
}

const Value& eval(ExpressionContext&) override {
LOG(FATAL) << "LabelAttributeExpression has to be rewritten";
DLOG(FATAL) << "LabelAttributeExpression has to be rewritten";
return Value::kNullBadData;
}

Expand Down
20 changes: 11 additions & 9 deletions src/graph/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ Value GetNeighborsIter::getVertex(const std::string& name) const {
return Value::kNullValue;
}

auto vidVal = getColumn(nebula::kVid);
auto vidVal = getColumn(0);
if (UNLIKELY(!SchemaUtil::isValidVid(vidVal))) {
return Value::kNullBadType;
}
Expand Down Expand Up @@ -502,12 +502,13 @@ Value GetNeighborsIter::getEdge() const {
edge.name = edgeName;

auto type = getEdgeProp(edgeName, kType);
if (!type.isInt()) {
return Value::kNullBadType;
if (type.isInt()) {
edge.type = type.getInt();
} else {
edge.type = 0;
}
edge.type = type.getInt();

auto& srcVal = getColumn(kVid);
auto& srcVal = getColumn(0);
if (!SchemaUtil::isValidVid(srcVal)) {
return Value::kNullBadType;
}
Expand All @@ -520,10 +521,11 @@ Value GetNeighborsIter::getEdge() const {
edge.dst = dstVal;

auto& rank = getEdgeProp(edgeName, kRank);
if (!rank.isInt()) {
return Value::kNullBadType;
if (rank.isInt()) {
edge.ranking = rank.getInt();
} else {
edge.ranking = 0;
}
edge.ranking = rank.getInt();

auto& edgePropMap = currentDs_->edgePropsMap;
auto edgeProp = edgePropMap.find(currentEdgeName());
Expand All @@ -535,7 +537,7 @@ Value GetNeighborsIter::getEdge() const {
DCHECK_EQ(edgeNamePropList.size(), propList.size());
for (size_t i = 0; i < propList.size(); ++i) {
auto propName = edgeNamePropList[i];
if (propName == kSrc || propName == kDst || propName == kRank || propName == kType) {
if (propName == kDst || propName == kRank || propName == kType || propName == kSrc) {
continue;
}
edge.props.emplace(edgeNamePropList[i], propList[i]);
Expand Down
9 changes: 9 additions & 0 deletions src/graph/context/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ class Result final {
}
}

std::vector<std::string> getColNames() const {
auto& ds = value();
if (ds.isDataSet()) {
return ds.getDataSet().colNames;
}

return {};
}

private:
friend class ResultBuilder;
friend class ExecutionContext;
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,10 @@ bool Executor::movable(const Variable *var) {
return false;
}
if (node()->loopLayers() != 0) {
// Guaranteed forward compatibility of go statement execution behavior
if (node()->kind() == PlanNode::Kind::kFilter) {
return true;
}
// The lifetime of loop body is managed by Loop node
return false;
}
Expand Down
36 changes: 23 additions & 13 deletions src/graph/executor/StorageAccessExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ struct Vid<std::string> {
};

template <typename VidType>
DataSet buildRequestDataSet(const SpaceInfo &space,
QueryExpressionContext &exprCtx,
Iterator *iter,
Expression *expr,
bool dedup) {
StatusOr<DataSet> buildRequestDataSet(const SpaceInfo &space,
QueryExpressionContext &exprCtx,
Iterator *iter,
Expression *expr,
bool dedup,
bool isCypher) {
DCHECK(iter && expr) << "iter=" << iter << ", expr=" << expr;
nebula::DataSet vertices({kVid});
auto s = iter->size();
Expand All @@ -54,11 +55,19 @@ DataSet buildRequestDataSet(const SpaceInfo &space,

for (; iter->valid(); iter->next()) {
auto vid = expr->eval(exprCtx(iter));
if (!SchemaUtil::isValidVid(vid, vidType)) {
LOG(WARNING) << "Mismatched vid type: " << vid.type()
<< ", space vid type: " << SchemaUtil::typeToString(vidType);
if (vid.empty()) {
continue;
}
if (!SchemaUtil::isValidVid(vid, vidType)) {
if (isCypher) {
continue;
}
std::stringstream ss;
ss << "`" << vid.toString() << "', the srcs should be type of "
<< apache::thrift::util::enumNameSafe(vidType.get_type()) << ", but was`" << vid.type()
<< "'";
return Status::Error(ss.str());
}
if (dedup && !uniqueSet.emplace(Vid<VidType>::value(vid)).second) {
continue;
}
Expand All @@ -73,16 +82,17 @@ bool StorageAccessExecutor::isIntVidType(const SpaceInfo &space) const {
return (*space.spaceDesc.vid_type_ref()).type == nebula::cpp2::PropertyType::INT64;
}

DataSet StorageAccessExecutor::buildRequestDataSetByVidType(Iterator *iter,
Expression *expr,
bool dedup) {
StatusOr<DataSet> StorageAccessExecutor::buildRequestDataSetByVidType(Iterator *iter,
Expression *expr,
bool dedup,
bool isCypher) {
const auto &space = qctx()->rctx()->session()->space();
QueryExpressionContext exprCtx(qctx()->ectx());

if (isIntVidType(space)) {
return internal::buildRequestDataSet<int64_t>(space, exprCtx, iter, expr, dedup);
return internal::buildRequestDataSet<int64_t>(space, exprCtx, iter, expr, dedup, isCypher);
}
return internal::buildRequestDataSet<std::string>(space, exprCtx, iter, expr, dedup);
return internal::buildRequestDataSet<std::string>(space, exprCtx, iter, expr, dedup, isCypher);
}

std::string StorageAccessExecutor::getStorageDetail(
Expand Down
5 changes: 4 additions & 1 deletion src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ class StorageAccessExecutor : public Executor {

bool isIntVidType(const SpaceInfo &space) const;

DataSet buildRequestDataSetByVidType(Iterator *iter, Expression *expr, bool dedup);
StatusOr<DataSet> buildRequestDataSetByVidType(Iterator *iter,
Expression *expr,
bool dedup,
bool isCypher = false);
};

} // namespace graph
Expand Down
49 changes: 43 additions & 6 deletions src/graph/executor/query/AppendVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,32 @@
using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::GetPropResponse;

DECLARE_bool(optimize_appendvertices);
namespace nebula {
namespace graph {
folly::Future<Status> AppendVerticesExecutor::execute() {
return appendVertices();
}

DataSet AppendVerticesExecutor::buildRequestDataSet(const AppendVertices *av) {
StatusOr<DataSet> AppendVerticesExecutor::buildRequestDataSet(const AppendVertices *av) {
if (av == nullptr) {
return nebula::DataSet({kVid});
}
auto valueIter = ectx_->getResult(av->inputVar()).iter();
return buildRequestDataSetByVidType(valueIter.get(), av->src(), av->dedup());
return buildRequestDataSetByVidType(valueIter.get(), av->src(), av->dedup(), true);
}

folly::Future<Status> AppendVerticesExecutor::appendVertices() {
SCOPED_TIMER(&execTime_);

auto *av = asNode<AppendVertices>(node());
StorageClient *storageClient = qctx()->getStorageClient();
if (FLAGS_optimize_appendvertices && av != nullptr && av->props() == nullptr) {
return handleNullProp(av);
}

DataSet vertices = buildRequestDataSet(av);
StorageClient *storageClient = qctx()->getStorageClient();
auto res = buildRequestDataSet(av);
NG_RETURN_IF_ERROR(res);
auto vertices = std::move(res).value();
if (vertices.rows.empty()) {
return finish(ResultBuilder().value(Value(DataSet(av->colNames()))).build());
}
Expand Down Expand Up @@ -67,6 +71,39 @@ folly::Future<Status> AppendVerticesExecutor::appendVertices() {
});
}

Status AppendVerticesExecutor::handleNullProp(const AppendVertices *av) {
auto iter = ectx_->getResult(av->inputVar()).iter();
auto *src = av->src();

auto size = iter->size();
DataSet ds;
ds.colNames = av->colNames();
ds.rows.reserve(size);

QueryExpressionContext ctx(ectx_);
bool canBeMoved = movable(av->inputVars().front());

for (; iter->valid(); iter->next()) {
const auto &vid = src->eval(ctx(iter.get()));
if (vid.empty()) {
continue;
}
Vertex vertex;
vertex.vid = vid;
if (!av->trackPrevPath()) {
Row row;
row.values.emplace_back(std::move(vertex));
ds.rows.emplace_back(std::move(row));
} else {
Row row;
row = canBeMoved ? iter->moveRow() : *iter->row();
row.values.emplace_back(std::move(vertex));
ds.rows.emplace_back(std::move(row));
}
}
return finish(ResultBuilder().value(Value(std::move(ds))).build());
}

Status AppendVerticesExecutor::handleResp(
storage::StorageRpcResponse<storage::cpp2::GetPropResponse> &&rpcResp) {
auto result = handleCompleteness(rpcResp, FLAGS_accept_partial_success);
Expand Down
4 changes: 3 additions & 1 deletion src/graph/executor/query/AppendVerticesExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ class AppendVerticesExecutor final : public GetPropExecutor {
folly::Future<Status> execute() override;

private:
DataSet buildRequestDataSet(const AppendVertices *gv);
StatusOr<DataSet> buildRequestDataSet(const AppendVertices *gv);

folly::Future<Status> appendVertices();

Status handleResp(storage::StorageRpcResponse<storage::cpp2::GetPropResponse> &&rpcResp);

Status handleNullProp(const AppendVertices *av);

folly::Future<Status> handleRespMultiJobs(
storage::StorageRpcResponse<storage::cpp2::GetPropResponse> &&rpcResp);

Expand Down
61 changes: 43 additions & 18 deletions src/graph/executor/query/FilterExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,32 +78,57 @@ StatusOr<DataSet> FilterExecutor::handleJob(size_t begin, size_t end, Iterator *

Status FilterExecutor::handleSingleJobFilter() {
auto *filter = asNode<Filter>(node());
Result result = ectx_->getResult(filter->inputVar());
auto inputVar = filter->inputVar();
// Use the userCount of the operator's inputVar at runtime to determine whether concurrent
// read-write conflicts exist, and if so, copy the data
bool canMoveData = movable(inputVar);
Result result = ectx_->getResult(inputVar);
auto *iter = result.iterRef();

// Always reuse getNeighbors's dataset to avoid some go statement execution plan related issues
if (iter->isGetNeighborsIter()) {
canMoveData = true;
}
ResultBuilder builder;
builder.value(result.valuePtr());
QueryExpressionContext ctx(ectx_);
auto condition = filter->condition();
while (iter->valid()) {
auto val = condition->eval(ctx(iter));
if (val.isBadNull() || (!val.empty() && !val.isImplicitBool() && !val.isNull())) {
return Status::Error("Wrong type result, the type should be NULL, EMPTY, BOOL");
}
if (val.empty() || val.isNull() || (val.isImplicitBool() && !val.implicitBool())) {
if (UNLIKELY(filter->needStableFilter())) {
iter->erase();
if (LIKELY(canMoveData)) {
builder.value(result.valuePtr());
while (iter->valid()) {
auto val = condition->eval(ctx(iter));
if (val.isBadNull() || (!val.empty() && !val.isImplicitBool() && !val.isNull())) {
return Status::Error("Wrong type result, the type should be NULL, EMPTY, BOOL");
}
if (val.empty() || val.isNull() || (val.isImplicitBool() && !val.implicitBool())) {
if (UNLIKELY(filter->needStableFilter())) {
iter->erase();
} else {
iter->unstableErase();
}
} else {
iter->unstableErase();
iter->next();
}
} else {
iter->next();
}
}

iter->reset();
builder.iter(std::move(result).iter());
return finish(builder.build());
iter->reset();
builder.iter(std::move(result).iter());
return finish(builder.build());
} else {
DataSet ds;
ds.colNames = result.getColNames();
ds.rows.reserve(iter->size());
for (; iter->valid(); iter->next()) {
auto val = condition->eval(ctx(iter));
if (val.isBadNull() || (!val.empty() && !val.isImplicitBool() && !val.isNull())) {
return Status::Error("Wrong type result, the type should be NULL, EMPTY, BOOL");
}
if (val.isImplicitBool() && val.implicitBool()) {
Row row;
row = *iter->row();
ds.rows.emplace_back(std::move(row));
}
}
return finish(builder.value(Value(std::move(ds))).iter(Iterator::Kind::kProp).build());
}
}

} // namespace graph
Expand Down
24 changes: 22 additions & 2 deletions src/graph/executor/query/GetEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "graph/executor/query/GetEdgesExecutor.h"

#include "graph/planner/plan/Query.h"
#include "graph/util/SchemaUtil.h"

using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
Expand All @@ -17,19 +18,36 @@ folly::Future<Status> GetEdgesExecutor::execute() {
return getEdges();
}

DataSet GetEdgesExecutor::buildRequestDataSet(const GetEdges *ge) {
StatusOr<DataSet> GetEdgesExecutor::buildRequestDataSet(const GetEdges *ge) {
auto valueIter = ectx_->getResult(ge->inputVar()).iter();
QueryExpressionContext exprCtx(qctx()->ectx());

nebula::DataSet edges({kSrc, kType, kRank, kDst});
edges.rows.reserve(valueIter->size());
std::unordered_set<std::tuple<Value, Value, Value, Value>> uniqueEdges;
uniqueEdges.reserve(valueIter->size());

const auto &space = qctx()->rctx()->session()->space();
const auto &vidType = *(space.spaceDesc.vid_type_ref());
for (; valueIter->valid(); valueIter->next()) {
auto type = ge->type()->eval(exprCtx(valueIter.get()));
auto src = ge->src()->eval(exprCtx(valueIter.get()));
auto dst = ge->dst()->eval(exprCtx(valueIter.get()));
auto rank = ge->ranking()->eval(exprCtx(valueIter.get()));
if (!SchemaUtil::isValidVid(src, vidType)) {
std::stringstream ss;
ss << "`" << src.toString() << "', the src should be type of "
<< apache::thrift::util::enumNameSafe(vidType.get_type()) << ", but was`" << src.type()
<< "'";
return Status::Error(ss.str());
}
if (!SchemaUtil::isValidVid(dst, vidType)) {
std::stringstream ss;
ss << "`" << dst.toString() << "', the dst should be type of "
<< apache::thrift::util::enumNameSafe(vidType.get_type()) << ", but was`" << dst.type()
<< "'";
return Status::Error(ss.str());
}
type = type < 0 ? -type : type;
auto edgeKey = std::make_tuple(src, type, rank, dst);
if (ge->dedup() && !uniqueEdges.emplace(std::move(edgeKey)).second) {
Expand All @@ -52,7 +70,9 @@ folly::Future<Status> GetEdgesExecutor::getEdges() {
return Status::Error("ptr is nullptr");
}

auto edges = buildRequestDataSet(ge);
auto res = buildRequestDataSet(ge);
NG_RETURN_IF_ERROR(res);
auto edges = std::move(res).value();

if (edges.rows.empty()) {
// TODO: add test for empty input.
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/GetEdgesExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class GetEdgesExecutor final : public GetPropExecutor {
folly::Future<Status> execute() override;

private:
DataSet buildRequestDataSet(const GetEdges *ge);
StatusOr<DataSet> buildRequestDataSet(const GetEdges *ge);

folly::Future<Status> getEdges();
};
Expand Down
Loading

0 comments on commit bb2e684

Please sign in to comment.