Skip to content

Commit

Permalink
implement compilation cancellation, add logs to tests, trying to fix …
Browse files Browse the repository at this point in the history
…test KIKIMR-19237
  • Loading branch information
gridnevvvit committed Sep 4, 2023
1 parent e66c95c commit 6150a51
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 19 deletions.
14 changes: 10 additions & 4 deletions ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace NKikimr::NKqp::NPrivateEvents {
struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid,
TMaybe<TKqpQueryId>&& query, bool keepInCache, TInstant deadline,
TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {},
TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult, NLWTrace::TOrbit orbit = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
: UserToken(userToken)
, Uid(uid)
Expand All @@ -22,7 +22,9 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
, Deadline(deadline)
, DbCounters(dbCounters)
, Orbit(std::move(orbit))
, TempTablesState(std::move(tempTablesState)) {
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
{
Y_ENSURE(Uid.Defined() != Query.Defined());
}

Expand All @@ -38,20 +40,23 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
NLWTrace::TOrbit Orbit;

TKqpTempTablesState::TConstPtr TempTablesState;
std::shared_ptr<std::atomic<bool>> IntrestedInResult;
};

struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
TEvRecompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& uid,
const TMaybe<TKqpQueryId>& query, TInstant deadline,
TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {},
TKqpDbCountersPtr dbCounters, std::shared_ptr<std::atomic<bool>> intrestedInResult, NLWTrace::TOrbit orbit = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
: UserToken(userToken)
, Uid(uid)
, Query(query)
, Deadline(deadline)
, DbCounters(dbCounters)
, Orbit(std::move(orbit))
, TempTablesState(std::move(tempTablesState)) {
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
{
}

TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
Expand All @@ -64,6 +69,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
NLWTrace::TOrbit Orbit;

TKqpTempTablesState::TConstPtr TempTablesState;
std::shared_ptr<std::atomic<bool>> IntrestedInResult;
};

struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
Expand Down
32 changes: 26 additions & 6 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ class TKqpQueryCache {
struct TKqpCompileRequest {
TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters,
ui64 cookie, NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {},
ui64 cookie, std::shared_ptr<std::atomic<bool>> intrestedInResult,
NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {},
TKqpTempTablesState::TConstPtr tempTablesState = {})
: Sender(sender)
, Query(std::move(query))
Expand All @@ -191,6 +192,7 @@ struct TKqpCompileRequest {
, CompileServiceSpan(std::move(span))
, Cookie(cookie)
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
{}

TActorId Sender;
Expand All @@ -206,6 +208,11 @@ struct TKqpCompileRequest {
NWilson::TSpan CompileServiceSpan;
ui64 Cookie;
TKqpTempTablesState::TConstPtr TempTablesState;
std::shared_ptr<std::atomic<bool>> IntrestedInResult;

bool IsIntrestedInResult() const {
return IntrestedInResult->load();
}
};

class TKqpRequestsQueue {
Expand Down Expand Up @@ -236,13 +243,26 @@ class TKqpRequestsQueue {
}

TMaybe<TKqpCompileRequest> Dequeue() {
for (auto it = Queue.begin(); it != Queue.end(); ++it) {
auto it = Queue.begin();

while (it != Queue.end()) {
auto& request = *it;
auto curIt = it++;

if (!request.IsIntrestedInResult()) {
auto result = std::move(request);
LOG_DEBUG(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE,
"Drop compilation request because session is not longer wait for response");
QueryIndex[result.Query].erase(curIt);
Queue.erase(curIt);
continue;
}

if (!ActiveRequests.contains(request.Query)) {
auto result = std::move(request);

QueryIndex[result.Query].erase(it);
Queue.erase(it);
QueryIndex[result.Query].erase(curIt);
Queue.erase(curIt);

return result;
}
Expand Down Expand Up @@ -531,7 +551,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query),
request.KeepInCache, request.UserToken, request.Deadline, dbCounters,
ev->Cookie,
ev->Cookie, std::move(ev->Get()->IntrestedInResult),
std::move(ev->Get()->Orbit), std::move(CompileServiceSpan), std::move(ev->Get()->TempTablesState));

if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
Expand Down Expand Up @@ -581,7 +601,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
true, request.UserToken, request.Deadline, dbCounters,
ev->Cookie,
ev->Cookie, std::move(ev->Get()->IntrestedInResult),
ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
std::move(CompileServiceSpan), std::move(ev->Get()->TempTablesState));

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
return true;
}

std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest() {
std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest(std::shared_ptr<std::atomic<bool>> cookie) {
TMaybe<TKqpQueryId> query;
TMaybe<TString> uid;

Expand Down Expand Up @@ -161,10 +161,10 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest()
}

return std::make_unique<TEvKqp::TEvCompileRequest>(UserToken, uid,
std::move(query), keepInCache, compileDeadline, DbCounters, std::move(Orbit), TempTablesState);
std::move(query), keepInCache, compileDeadline, DbCounters, std::move(cookie), std::move(Orbit), TempTablesState);
}

std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileRequest() {
std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileRequest(std::shared_ptr<std::atomic<bool>> cookie) {
YQL_ENSURE(CompileResult);
TMaybe<TKqpQueryId> query;
TMaybe<TString> uid;
Expand Down Expand Up @@ -197,7 +197,7 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque
}

return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid,
CompileResult->Query, compileDeadline, DbCounters, std::move(Orbit), TempTablesState);
CompileResult->Query, compileDeadline, DbCounters, std::move(cookie), std::move(Orbit), TempTablesState);
}

void TKqpQueryState::AddOffsetsToTransaction() {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,10 @@ class TKqpQueryState : public TNonCopyable {
// same the context of the compiled query to the query state.
bool SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev);
// build the compilation request.
std::unique_ptr<TEvKqp::TEvCompileRequest> BuildCompileRequest();
std::unique_ptr<TEvKqp::TEvCompileRequest> BuildCompileRequest(std::shared_ptr<std::atomic<bool>> cookie);
// TODO(gvit): get rid of code duplication in these requests,
// use only one of these requests.
std::unique_ptr<TEvKqp::TEvRecompileRequest> BuildReCompileRequest();
std::unique_ptr<TEvKqp::TEvRecompileRequest> BuildReCompileRequest(std::shared_ptr<std::atomic<bool>> cookie);

const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& GetYdbParameters() const {
return RequestEv->GetYdbParameters();
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class TTimerGuard {
RequestCounters->Counters = Counters;
RequestCounters->DbCounters = Settings.DbCounters;
RequestCounters->TxProxyMon = MakeIntrusive<NTxProxy::TTxProxyMon>(AppData()->Counters);
CompilationCookie = std::make_shared<std::atomic<bool>>(true);

FillSettings.AllResultsBytesLimit = Nothing();
FillSettings.RowsLimitPerWrite = Config->_ResultRowsLimit.Get().GetRef();
Expand Down Expand Up @@ -459,7 +460,7 @@ class TTimerGuard {

void CompileQuery() {
YQL_ENSURE(QueryState);
auto ev = QueryState->BuildCompileRequest();
auto ev = QueryState->BuildCompileRequest(CompilationCookie);
LOG_D("Sending CompileQuery request");
Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId);
Become(&TKqpSessionActor::CompileState);
Expand All @@ -479,7 +480,7 @@ class TTimerGuard {

// table versions are not the same. need the query recompilation.
if (!QueryState->EnsureTableVersions(*response)) {
auto ev = QueryState->BuildReCompileRequest();
auto ev = QueryState->BuildReCompileRequest(CompilationCookie);
Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId);
return;
}
Expand Down Expand Up @@ -1707,6 +1708,11 @@ class TTimerGuard {
if (isFinal)
Counters->ReportSessionActorClosedRequest(Settings.DbCounters);

if (isFinal) {
// no longer intrested in any compilation responses
CompilationCookie->store(false);
}

if (isFinal) {
Transactions.FinalCleanup();
Counters->ReportTxAborted(Settings.DbCounters, Transactions.ToBeAbortedSize());
Expand Down Expand Up @@ -2151,6 +2157,7 @@ class TTimerGuard {
TKqpTempTablesState TempTablesState;

NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;
std::shared_ptr<std::atomic<bool>> CompilationCookie;
};

} // namespace
Expand Down
11 changes: 10 additions & 1 deletion ydb/core/kqp/ut/service/kqp_service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ Y_UNIT_TEST_SUITE(KqpService) {

Y_UNIT_TEST(CloseSessionsWithLoad) {
auto kikimr = std::make_shared<TKikimrRunner>();
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG);
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG);
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG);

auto db = kikimr->GetTableClient();

const ui32 SessionsCount = 50;
Expand All @@ -84,11 +89,13 @@ Y_UNIT_TEST_SUITE(KqpService) {
NPar::LocalExecutor().ExecRange([kikimr, sessions, WaitDuration](int id) mutable {
if (id == (i32)sessions.size()) {
Sleep(WaitDuration);

Cerr << "start sessions close....." << Endl;
for (ui32 i = 0; i < sessions.size(); ++i) {
sessions[i].Close();
}

Cerr << "finished sessions close....." << Endl;

return;
}

Expand Down Expand Up @@ -120,6 +127,8 @@ Y_UNIT_TEST_SUITE(KqpService) {

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx()).GetValueSync();
if (!result.IsSuccess()) {
Sleep(TDuration::Seconds(5));
Cerr << "received non-success status for session " << id << Endl;
return;
}

Expand Down

0 comments on commit 6150a51

Please sign in to comment.