Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [QC-858] Single CheckRunner per workflow #2061

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ install(PROGRAMS script/o2-qc-functional-test.sh
set(TEST_FILES
"testSharedConfig.json"
"testEmptyConfig.json"
"testOnlyTaskConfig.json"
"testCheckWorkflow.json"
"testTrendingTask.json"
"testWorkflow.json"
Expand Down
1 change: 0 additions & 1 deletion Framework/include/QualityControl/CheckConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ struct CheckConfig {
UpdatePolicyType policyType = UpdatePolicyType::OnAny;
std::vector<std::string> objectNames{}; // fixme: if object names are empty, allObjects are true, consider reducing to one var
bool allObjects = false;
bool allowBeautify = false;
framework::Inputs inputSpecs{};
framework::OutputSpec qoSpec{ "XXX", "INVALID" };
std::string conditionUrl{};
Expand Down
25 changes: 4 additions & 21 deletions Framework/include/QualityControl/CheckRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "QualityControl/CheckRunnerConfig.h"
#include "QualityControl/Check.h"
#include "QualityControl/MonitorObject.h"
#include "QualityControl/QualityObject.h"
#include "QualityControl/UpdatePolicyManager.h"

namespace o2::quality_control::core
Expand Down Expand Up @@ -79,7 +78,6 @@ namespace o2::quality_control::checker
class CheckRunner : public framework::Task
{
public:
/// Constructor
/**
* \brief CheckRunner constructor
*
Expand All @@ -88,20 +86,9 @@ class CheckRunner : public framework::Task
* Group check assumes that the input of the checks is the same!
*
* @param checkRunnerConfig configuration of CheckRunner
* @param checkConfigs configuration of all Checks that should run in this data processor
*/
CheckRunner(CheckRunnerConfig, const std::vector<CheckConfig>& checkConfigs);

/**
* \brief CheckRunner constructor
*
* Create a sink for the Input. It is expected to receive Monitor Object to store.
* It will not run any checks on a given input.
*
* @param checkRunnerConfig configuration of CheckRunner
* @param input Monitor Object input spec.
*/
CheckRunner(CheckRunnerConfig, o2::framework::InputSpec input);
CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, o2::framework::Inputs inputs);

/// Destructor
~CheckRunner() override;
Expand All @@ -118,19 +105,16 @@ class CheckRunner : public framework::Task
framework::Inputs getInputs() { return mInputs; };
framework::Outputs getOutputs() { return mOutputs; };

void setTaskStoreSet(std::unordered_set<std::string> storeSet) { mInputStoreSet = storeSet; }
std::string getDeviceName() { return mDeviceName; };

static framework::DataProcessorLabel getCheckRunnerLabel() { return { "qc-check" }; }
static std::string createCheckRunnerIdString() { return "qc-check"; };
static std::string createCheckRunnerName(const std::vector<CheckConfig>& checks);
static std::string createSinkCheckRunnerName(o2::framework::InputSpec input);
static std::string createCheckRunnerFacility(std::string deviceName);
static std::string createCheckRunnerName();

/// \brief Compute the detector name to be used for this checkrunner.
/// Compute the detector name to be used for this checkrunner.
/// If all checks belong to the same detector we use it, otherwise we use "MANY"
static std::string getDetectorName(const std::vector<CheckConfig> checks);
static std::string getDetectorName(const std::vector<CheckConfig>& checks);

private:
/**
Expand Down Expand Up @@ -221,8 +205,7 @@ class CheckRunner : public framework::Task
std::shared_ptr<Activity> mActivity; // shareable with the Checks
CheckRunnerConfig mConfig;
std::shared_ptr<o2::quality_control::repository::DatabaseInterface> mDatabase;
std::unordered_set<std::string> mInputStoreSet;
std::vector<std::shared_ptr<MonitorObject>> mMonitorObjectStoreVector;
std::vector<std::shared_ptr<MonitorObject>> mToBeStored;
UpdatePolicyManager updatePolicyManager;
bool mReceivedEOS = false;

Expand Down
12 changes: 1 addition & 11 deletions Framework/include/QualityControl/CheckRunnerFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,7 @@ class CheckRunnerFactory
CheckRunnerFactory() = default;
virtual ~CheckRunnerFactory() = default;

static framework::DataProcessorSpec create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, std::vector<std::string> storeVector = {});

/*
* \brief Create a CheckRunner sink DPL device.
*
* The purpose of this device is to receive and store the MO from task.
*
* @param input InputSpec with the content to store
* @param configurationSource
*/
static framework::DataProcessorSpec createSinkDevice(const CheckRunnerConfig& checkRunnerConfig, const o2::framework::InputSpec& input);
static framework::DataProcessorSpec create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs);

static CheckRunnerConfig extractConfig(const core::CommonSpec&);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class InfrastructureGenerator
const std::string& mergingMode, size_t resetAfterCycles,
std::string monitoringUrl, const std::string& detectorName,
std::vector<size_t> mergersPerLayer, bool enableMovingWindows);
static void generateCheckRunners(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
static void generateCheckRunner(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec, bool hasActiveTasks);
static void generateAggregator(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
static void generatePostProcessing(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
};
Expand Down
11 changes: 0 additions & 11 deletions Framework/src/Check.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,6 @@ QualityObjectsType Check::check(std::map<std::string, std::shared_ptr<MonitorObj

void Check::beautify(std::map<std::string, std::shared_ptr<MonitorObject>>& moMap, const Quality& quality)
{
if (!mCheckConfig.allowBeautify) {
return;
}

for (auto const& item : moMap) {
try {
mCheckInterface->beautify(item.second /*mo*/, quality);
Expand Down Expand Up @@ -236,12 +232,6 @@ CheckConfig Check::extractConfig(const CommonSpec& commonSpec, const CheckSpec&
}
}

bool allowBeautify = checkSpec.dataSources.size() <= 1;
if (!allowBeautify) {
// See QC-299 for details
ILOG(Warning, Devel) << "Beautification disabled because more than one source is used in this Check (" << checkSpec.checkName << ")" << ENDM;
Barthelemy marked this conversation as resolved.
Show resolved Hide resolved
}

return {
checkSpec.checkName,
checkSpec.moduleName,
Expand All @@ -251,7 +241,6 @@ CheckConfig Check::extractConfig(const CommonSpec& commonSpec, const CheckSpec&
updatePolicy,
std::move(objectNames),
checkAllObjects,
allowBeautify,
std::move(inputs),
createOutputSpec(checkSpec.checkName),
commonSpec.conditionDBUrl
Expand Down
87 changes: 12 additions & 75 deletions Framework/src/CheckRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -69,56 +69,9 @@ std::size_t CheckRunner::hash(const std::string& inputString)
return checksum;
}

std::string CheckRunner::createCheckRunnerName(const std::vector<CheckConfig>& checks)
std::string CheckRunner::createCheckRunnerName()
{
static const std::string alphanumeric =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
const int NAME_LEN = 4;
std::string name(CheckRunner::createCheckRunnerIdString() + "-" + getDetectorName(checks) + "-");

if (checks.size() == 1) {
// If single check, use the check name
name += checks[0].name;
} else {
std::string hash_string;
std::vector<std::string> names;
// Fill vector with check names
for (const auto& c : checks) {
names.push_back(c.name);
}
// Be sure that after configuration shuffle, the name will be the same
std::sort(names.begin(), names.end());

// Create a single string and hash it
for (auto& n : names) {
hash_string += n;
}
std::size_t num = hash(hash_string);

// Change numerical to alphanumeric hash representation
for (int i = 0; i < NAME_LEN; ++i) {
name += alphanumeric[num % alphanumeric.size()];
num = num / alphanumeric.size();
}
}
return name;
}

std::string CheckRunner::createCheckRunnerFacility(std::string deviceName)
{
// it starts with "check/" and is followed by the unique part of the device name truncated to a maximum of 32 characters.
string facilityName = "check/" + deviceName.substr(CheckRunner::createCheckRunnerIdString().length() + 1, string::npos);
facilityName = facilityName.substr(0, 32);
return facilityName;
}

std::string CheckRunner::createSinkCheckRunnerName(InputSpec input)
{
std::string name(CheckRunner::createCheckRunnerIdString() + "-sink-");
name += DataSpecUtils::label(input);
return name;
return CheckRunner::createCheckRunnerIdString();
}

o2::framework::Outputs CheckRunner::collectOutputs(const std::vector<CheckConfig>& checkConfigs)
Expand All @@ -130,13 +83,12 @@ o2::framework::Outputs CheckRunner::collectOutputs(const std::vector<CheckConfig
return outputs;
}

CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs)
CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, o2::framework::Inputs inputs)
: mDetectorName(getDetectorName(checkConfigs)),
mDeviceName(createCheckRunnerName(checkConfigs)),
mDeviceName(createCheckRunnerName()),
mConfig(std::move(checkRunnerConfig)),
/* All checks have the same Input */
mInputs(checkConfigs.front().inputSpecs),
mOutputs(CheckRunner::collectOutputs(checkConfigs)),
mInputs{ inputs },
mOutputs{ CheckRunner::collectOutputs(checkConfigs) },
mTotalNumberObjectsReceived(0),
mTotalNumberCheckExecuted(0),
mTotalNumberQOStored(0),
Expand All @@ -148,19 +100,6 @@ CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<
}
}

CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, InputSpec input)
: mDeviceName(createSinkCheckRunnerName(input)),
mConfig(std::move(checkRunnerConfig)),
mInputs{ input },
mOutputs{},
mTotalNumberObjectsReceived(0),
mTotalNumberCheckExecuted(0),
mTotalNumberQOStored(0),
mTotalNumberMOStored(0),
mTotalQOSent(0)
{
}

CheckRunner::~CheckRunner()
{
ILOG(Debug, Trace) << "CheckRunner destructor (" << this << ")" << ENDM;
Expand Down Expand Up @@ -212,7 +151,7 @@ void CheckRunner::refreshConfig(InitContext& iCtx)
void CheckRunner::init(framework::InitContext& iCtx)
{
try {
core::initInfologger(iCtx, mConfig.infologgerDiscardParameters, createCheckRunnerFacility(mDeviceName));
core::initInfologger(iCtx, mConfig.infologgerDiscardParameters, mDeviceName);
refreshConfig(iCtx);
Bookkeeping::getInstance().init(mConfig.bookkeepingUrl);
initDatabase();
Expand Down Expand Up @@ -258,7 +197,7 @@ void CheckRunner::run(framework::ProcessingContext& ctx)

auto now = getCurrentTimestamp();
store(qualityObjects, now);
store(mMonitorObjectStoreVector, now);
store(mToBeStored, now);

send(qualityObjects, ctx.outputs());

Expand All @@ -270,7 +209,7 @@ void CheckRunner::run(framework::ProcessingContext& ctx)

void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord)
{
mMonitorObjectStoreVector.clear();
mToBeStored.clear();

for (const auto& input : mInputs) {
auto dataRef = inputRecord.get(input.binding.c_str());
Expand Down Expand Up @@ -300,7 +239,6 @@ void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord)

// for each item of the array, check whether it is a MonitorObject. If not, create one and encapsulate.
// Then, store the MonitorObject in the various maps and vectors we will use later.
bool store = mInputStoreSet.count(DataSpecUtils::label(input)) > 0; // Check if this CheckRunner stores this input
for (const auto tObject : *array) {
std::shared_ptr<MonitorObject> mo{ dynamic_cast<MonitorObject*>(tObject) };

Expand All @@ -318,9 +256,8 @@ void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord)
updatePolicyManager.updateObjectRevision(mo->getFullName());
mTotalNumberObjectsReceived++;

if (store) { // Monitor Object will be stored later, after possible beautification
mMonitorObjectStoreVector.push_back(mo);
}
// Monitor Object will be stored later, after possible beautification
mToBeStored.push_back(mo);
}
}
}
Expand Down Expand Up @@ -554,7 +491,7 @@ void CheckRunner::reset()
mTotalQOSent = 0;
}

std::string CheckRunner::getDetectorName(const std::vector<CheckConfig> checks)
std::string CheckRunner::getDetectorName(const std::vector<CheckConfig>& checks)
{
std::string detectorName;
for (auto& check : checks) {
Expand Down
39 changes: 20 additions & 19 deletions Framework/src/CheckRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,34 @@
#include "QualityControl/CheckRunner.h"
#include "QualityControl/CheckRunnerFactory.h"
#include "QualityControl/CommonSpec.h"
#include <set>

namespace o2::quality_control::checker
{

using namespace o2::framework;

DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, std::vector<std::string> storeVector)
DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs)
{
auto options = checkRunnerConfig.options;
CheckRunner qcCheckRunner{ std::move(checkRunnerConfig), checkConfigs };
qcCheckRunner.setTaskStoreSet({ storeVector.begin(), storeVector.end() });

// concatenate all inputs
o2::framework::Inputs allInputs;
for (auto config : checkConfigs) {
allInputs.insert(allInputs.end(), config.inputSpecs.begin(), config.inputSpecs.end());
}

// We can end up with duplicated inputs that will later lead to circular dependencies on the checkRunner device.
o2::framework::Inputs allInputsNoDups;
std::set<std::string> alreadySeen;
for (auto input : allInputs) {
if (alreadySeen.count(input.binding) == 0) {
allInputsNoDups.push_back(input);
}
alreadySeen.insert(input.binding);
}

CheckRunner qcCheckRunner{ std::move(checkRunnerConfig), checkConfigs, allInputsNoDups };

DataProcessorSpec newCheckRunner{ qcCheckRunner.getDeviceName(),
qcCheckRunner.getInputs(),
Expand All @@ -49,22 +66,6 @@ DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig
return newCheckRunner;
}

DataProcessorSpec CheckRunnerFactory::createSinkDevice(const CheckRunnerConfig& checkRunnerConfig, const o2::framework::InputSpec& input)
{
CheckRunner qcCheckRunner{ checkRunnerConfig, input };
qcCheckRunner.setTaskStoreSet({ DataSpecUtils::label(input) });

DataProcessorSpec newCheckRunner{ qcCheckRunner.getDeviceName(),
qcCheckRunner.getInputs(),
Outputs{ qcCheckRunner.getOutputs() },
adaptFromTask<CheckRunner>(std::move(qcCheckRunner)),
checkRunnerConfig.options,
{},
{ o2::framework::ecs::qcReconfigurable } };

return newCheckRunner;
}

void CheckRunnerFactory::customizeInfrastructure(std::vector<framework::CompletionPolicy>& policies)
{
auto matcher = [label = CheckRunner::getCheckRunnerLabel()](framework::DeviceSpec const& device) {
Expand Down
Loading
Loading