diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h index e36b103707b8..6af53925d02f 100644 --- a/ydb/core/kqp/common/compilation/events.h +++ b/ydb/core/kqp/common/compilation/events.h @@ -13,7 +13,7 @@ namespace NKikimr::NKqp::NPrivateEvents { struct TEvCompileRequest: public TEventLocal { TEvCompileRequest(const TIntrusiveConstPtr& userToken, const TMaybe& uid, TMaybe&& query, bool keepInCache, TInstant deadline, - TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}, + TKqpDbCountersPtr dbCounters, std::shared_ptr> intrestedInResult, NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) : UserToken(userToken) , Uid(uid) @@ -22,7 +22,9 @@ struct TEvCompileRequest: public TEventLocal> IntrestedInResult; }; struct TEvRecompileRequest: public TEventLocal { TEvRecompileRequest(const TIntrusiveConstPtr& userToken, const TString& uid, const TMaybe& query, TInstant deadline, - TKqpDbCountersPtr dbCounters, NLWTrace::TOrbit orbit = {}, + TKqpDbCountersPtr dbCounters, std::shared_ptr> intrestedInResult, NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) : UserToken(userToken) , Uid(uid) @@ -51,7 +54,9 @@ struct TEvRecompileRequest: public TEventLocal UserToken; @@ -64,6 +69,7 @@ struct TEvRecompileRequest: public TEventLocal> IntrestedInResult; }; struct TEvCompileResponse: public TEventLocal { diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 7bd700c4a86e..24761a849a46 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -178,7 +178,8 @@ class TKqpQueryCache { struct TKqpCompileRequest { TKqpCompileRequest(const TActorId& sender, const TString& uid, TKqpQueryId query, bool keepInCache, const TIntrusiveConstPtr& userToken, const TInstant& deadline, TKqpDbCountersPtr dbCounters, - ui64 cookie, NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {}, + ui64 cookie, std::shared_ptr> intrestedInResult, + NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {}, TKqpTempTablesState::TConstPtr tempTablesState = {}) : Sender(sender) , Query(std::move(query)) @@ -191,6 +192,7 @@ struct TKqpCompileRequest { , CompileServiceSpan(std::move(span)) , Cookie(cookie) , TempTablesState(std::move(tempTablesState)) + , IntrestedInResult(std::move(intrestedInResult)) {} TActorId Sender; @@ -206,6 +208,11 @@ struct TKqpCompileRequest { NWilson::TSpan CompileServiceSpan; ui64 Cookie; TKqpTempTablesState::TConstPtr TempTablesState; + std::shared_ptr> IntrestedInResult; + + bool IsIntrestedInResult() const { + return IntrestedInResult->load(); + } }; class TKqpRequestsQueue { @@ -236,13 +243,26 @@ class TKqpRequestsQueue { } TMaybe Dequeue() { - for (auto it = Queue.begin(); it != Queue.end(); ++it) { + auto it = Queue.begin(); + + while (it != Queue.end()) { auto& request = *it; + auto curIt = it++; + + if (!request.IsIntrestedInResult()) { + auto result = std::move(request); + LOG_DEBUG(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, + "Drop compilation request because session is not longer wait for response"); + QueryIndex[result.Query].erase(curIt); + Queue.erase(curIt); + continue; + } + if (!ActiveRequests.contains(request.Query)) { auto result = std::move(request); - QueryIndex[result.Query].erase(it); - Queue.erase(it); + QueryIndex[result.Query].erase(curIt); + Queue.erase(curIt); return result; } @@ -531,7 +551,7 @@ class TKqpCompileService : public TActorBootstrapped { TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query), request.KeepInCache, request.UserToken, request.Deadline, dbCounters, - ev->Cookie, + ev->Cookie, std::move(ev->Get()->IntrestedInResult), std::move(ev->Get()->Orbit), std::move(CompileServiceSpan), std::move(ev->Get()->TempTablesState)); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { @@ -581,7 +601,7 @@ class TKqpCompileService : public TActorBootstrapped { TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query, true, request.UserToken, request.Deadline, dbCounters, - ev->Cookie, + ev->Cookie, std::move(ev->Get()->IntrestedInResult), ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(), std::move(CompileServiceSpan), std::move(ev->Get()->TempTablesState)); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index ccd842f5048b..8bc47fa74f4e 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -120,7 +120,7 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) { return true; } -std::unique_ptr TKqpQueryState::BuildCompileRequest() { +std::unique_ptr TKqpQueryState::BuildCompileRequest(std::shared_ptr> cookie) { TMaybe query; TMaybe uid; @@ -161,10 +161,10 @@ std::unique_ptr TKqpQueryState::BuildCompileRequest() } return std::make_unique(UserToken, uid, - std::move(query), keepInCache, compileDeadline, DbCounters, std::move(Orbit), TempTablesState); + std::move(query), keepInCache, compileDeadline, DbCounters, std::move(cookie), std::move(Orbit), TempTablesState); } -std::unique_ptr TKqpQueryState::BuildReCompileRequest() { +std::unique_ptr TKqpQueryState::BuildReCompileRequest(std::shared_ptr> cookie) { YQL_ENSURE(CompileResult); TMaybe query; TMaybe uid; @@ -197,7 +197,7 @@ std::unique_ptr TKqpQueryState::BuildReCompileReque } return std::make_unique(UserToken, CompileResult->Uid, - CompileResult->Query, compileDeadline, DbCounters, std::move(Orbit), TempTablesState); + CompileResult->Query, compileDeadline, DbCounters, std::move(cookie), std::move(Orbit), TempTablesState); } void TKqpQueryState::AddOffsetsToTransaction() { diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 21bdaafaaff4..36c61a07588b 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -354,10 +354,10 @@ class TKqpQueryState : public TNonCopyable { // same the context of the compiled query to the query state. bool SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev); // build the compilation request. - std::unique_ptr BuildCompileRequest(); + std::unique_ptr BuildCompileRequest(std::shared_ptr> cookie); // TODO(gvit): get rid of code duplication in these requests, // use only one of these requests. - std::unique_ptr BuildReCompileRequest(); + std::unique_ptr BuildReCompileRequest(std::shared_ptr> cookie); const ::google::protobuf::Map& GetYdbParameters() const { return RequestEv->GetYdbParameters(); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index d0705e0132af..ad621575501e 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -170,6 +170,7 @@ class TTimerGuard { RequestCounters->Counters = Counters; RequestCounters->DbCounters = Settings.DbCounters; RequestCounters->TxProxyMon = MakeIntrusive(AppData()->Counters); + CompilationCookie = std::make_shared>(true); FillSettings.AllResultsBytesLimit = Nothing(); FillSettings.RowsLimitPerWrite = Config->_ResultRowsLimit.Get().GetRef(); @@ -459,7 +460,7 @@ class TTimerGuard { void CompileQuery() { YQL_ENSURE(QueryState); - auto ev = QueryState->BuildCompileRequest(); + auto ev = QueryState->BuildCompileRequest(CompilationCookie); LOG_D("Sending CompileQuery request"); Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId); Become(&TKqpSessionActor::CompileState); @@ -479,7 +480,7 @@ class TTimerGuard { // table versions are not the same. need the query recompilation. if (!QueryState->EnsureTableVersions(*response)) { - auto ev = QueryState->BuildReCompileRequest(); + auto ev = QueryState->BuildReCompileRequest(CompilationCookie); Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId); return; } @@ -1707,6 +1708,11 @@ class TTimerGuard { if (isFinal) Counters->ReportSessionActorClosedRequest(Settings.DbCounters); + if (isFinal) { + // no longer intrested in any compilation responses + CompilationCookie->store(false); + } + if (isFinal) { Transactions.FinalCleanup(); Counters->ReportTxAborted(Settings.DbCounters, Transactions.ToBeAbortedSize()); @@ -2151,6 +2157,7 @@ class TTimerGuard { TKqpTempTablesState TempTablesState; NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig; + std::shared_ptr> CompilationCookie; }; } // namespace diff --git a/ydb/core/kqp/ut/service/kqp_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_service_ut.cpp index 4eed1d3124fe..dd462b00e4b2 100644 --- a/ydb/core/kqp/ut/service/kqp_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_service_ut.cpp @@ -67,6 +67,11 @@ Y_UNIT_TEST_SUITE(KqpService) { Y_UNIT_TEST(CloseSessionsWithLoad) { auto kikimr = std::make_shared(); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG); + kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG); + auto db = kikimr->GetTableClient(); const ui32 SessionsCount = 50; @@ -84,11 +89,13 @@ Y_UNIT_TEST_SUITE(KqpService) { NPar::LocalExecutor().ExecRange([kikimr, sessions, WaitDuration](int id) mutable { if (id == (i32)sessions.size()) { Sleep(WaitDuration); - + Cerr << "start sessions close....." << Endl; for (ui32 i = 0; i < sessions.size(); ++i) { sessions[i].Close(); } + Cerr << "finished sessions close....." << Endl; + return; } @@ -120,6 +127,8 @@ Y_UNIT_TEST_SUITE(KqpService) { auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx()).GetValueSync(); if (!result.IsSuccess()) { + Sleep(TDuration::Seconds(5)); + Cerr << "received non-success status for session " << id << Endl; return; }