Skip to content

Commit

Permalink
bug fix: shutdown meta server before job manager init, may core (#2332)
Browse files Browse the repository at this point in the history
* bug fix: shutdown meta server before job manager init, may core

* check if there is a stop before init, and if there is no init before stop

* CI issue

* fix clang-9 CI complait

* clang-9 complaint keyword macro, lower compiler complaint clang-9 pragma... tired

Co-authored-by: dangleptr <37216992+dangleptr@users.noreply.github.com>
  • Loading branch information
lionel.liu@vesoft.com and dangleptr authored Sep 15, 2020
1 parent 7678846 commit a571719
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 7 deletions.
4 changes: 3 additions & 1 deletion src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ void signalHandler(int sig) {
}
{
auto gJobMgr = nebula::meta::JobManager::getInstance();
gJobMgr->shutDown();
if (gJobMgr) {
gJobMgr->shutDown();
}
}
if (gKVStore) {
gKVStore->stop();
Expand Down
18 changes: 15 additions & 3 deletions src/meta/processors/jobMan/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,21 @@ bool JobManager::init(nebula::kvstore::KVStore* store) {
if (store == nullptr) {
return false;
}
std::lock_guard<std::mutex> lk(statusGuard_);
if (status_ != Status::NOT_START) {
return false;
}
kvStore_ = store;
pool_ = std::make_unique<nebula::thread::GenericThreadPool>();
pool_->start(FLAGS_dispatch_thread_num);

queue_ = std::make_unique<folly::UMPSCQueue<int32_t, true>>();
bgThread_ = std::make_unique<thread::GenericWorker>();
CHECK(bgThread_->start());

status_ = Status::RUNNING;
bgThread_->addTask(&JobManager::runJobBackground, this);
LOG(INFO) << "JobManager initialized";
return true;
}

Expand All @@ -59,7 +66,12 @@ JobManager::~JobManager() {

void JobManager::shutDown() {
LOG(INFO) << "JobManager::shutDown() begin";
shutDown_ = true;
std::lock_guard<std::mutex> lk(statusGuard_);
if (status_ != Status::RUNNING) { // in case of shutdown more than once
LOG(INFO) << "JobManager not running, exit";
return;
}
status_ = Status::STOPPED;
pool_->stop();
bgThread_->stop();
bgThread_->wait();
Expand All @@ -68,10 +80,10 @@ void JobManager::shutDown() {

void JobManager::runJobBackground() {
LOG(INFO) << "JobManager::runJobBackground() enter";
while (!shutDown_) {
while (status_ == Status::RUNNING) {
int32_t iJob = 0;
while (!queue_->try_dequeue(iJob)) {
if (shutDown_) {
if (status_ == Status::STOPPED) {
LOG(INFO) << "[JobManager] detect shutdown called, exit";
break;
}
Expand Down
13 changes: 10 additions & 3 deletions src/meta/processors/jobMan/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace nebula {
namespace meta {

class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovable {
using ResultCode = nebula::kvstore::ResultCode;
friend class JobManagerTest;
FRIEND_TEST(JobManagerTest, reserveJobId);
FRIEND_TEST(JobManagerTest, buildJobDescription);
FRIEND_TEST(JobManagerTest, addJob);
Expand All @@ -31,12 +33,16 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab
FRIEND_TEST(JobManagerTest, showJob);
FRIEND_TEST(JobManagerTest, recoverJob);

using ResultCode = nebula::kvstore::ResultCode;

public:
~JobManager();
static JobManager* getInstance();

enum class Status {
NOT_START,
RUNNING,
STOPPED,
};

bool init(nebula::kvstore::KVStore* store);

void shutDown();
Expand All @@ -62,7 +68,8 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab
std::unique_ptr<folly::UMPSCQueue<int32_t, true>> queue_;
std::unique_ptr<thread::GenericWorker> bgThread_;

bool shutDown_{false};
std::mutex statusGuard_;
Status status_{Status::NOT_START};
nebula::kvstore::KVStore* kvStore_{nullptr};
std::unique_ptr<nebula::thread::GenericThreadPool> pool_{nullptr};
};
Expand Down
1 change: 1 addition & 0 deletions src/meta/test/JobManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class JobManagerTest : public ::testing::Test {
TestUtils::createSomeHosts(kv_.get());
TestUtils::assembleSpace(kv_.get(), 1, 1);
jobMgr = JobManager::getInstance();
jobMgr->status_ = JobManager::Status::NOT_START;
jobMgr->init(kv_.get());
LOG(INFO) << "exit" << __func__;
}
Expand Down

0 comments on commit a571719

Please sign in to comment.