Skip to content

Commit

Permalink
#3: migrating code to be compatible with WRENCH 1.5
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelfsilva committed Mar 5, 2020
1 parent 684720d commit d9c2935
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 116 deletions.
95 changes: 8 additions & 87 deletions src/DAGManScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,94 +51,16 @@ namespace wrench {
for (auto task : tasks) {
WorkflowJob *job = nullptr;

// if (task->getTaskType() == WorkflowTask::TaskType::TRANSFER) {
// // data stage in/out task
// std::map<WorkflowFile *, std::shared_ptr<StorageService>> file_locations;
// std::set<std::tuple<WorkflowFile *, std::shared_ptr<StorageService>, std::shared_ptr<StorageService>>> pre_file_copies;
// std::set<std::tuple<WorkflowFile *, std::shared_ptr<StorageService>, std::shared_ptr<StorageService>>> post_file_copies;
//
// // input files
// for (auto input_file : task->getInputFiles()) {
// // finding storage service
// auto src_dest = (*task->getFileTransfers().find(input_file)).second;
//
// StorageService *src =
// src_dest.first == "local" ? htcondor_service->getLocalStorageService() : nullptr;
// StorageService *dest =
// src_dest.second == "local" ? htcondor_service->getLocalStorageService() : nullptr;
//
// for (auto storage_service : this->storage_services) {
// if (!src && storage_service->getHostname() == src_dest.first) {
// src = storage_service;
// } else if (!dest && storage_service->getHostname() == src_dest.second) {
// dest = storage_service;
// }
// }
//
// if (src != dest) {
// pre_file_copies.insert(std::make_tuple(input_file, src, dest));
// }
// file_locations.insert(std::make_pair(input_file, dest));
// }
//
// // output files
// for (auto output_file : task->getOutputFiles()) {
// // finding storage service
// auto src_dest = (*task->getFileTransfers().find(output_file)).second;
//
// StorageService *src =
// src_dest.first == "local" ? htcondor_service->getLocalStorageService() : nullptr;
// StorageService *dest =
// src_dest.second == "local" ? htcondor_service->getLocalStorageService() : nullptr;
//
// for (auto storage_service : this->storage_services) {
// if (!src && storage_service->getHostname() == src_dest.first) {
// src = storage_service;
// } else if (!dest && storage_service->getHostname() == src_dest.second) {
// dest = storage_service;
// }
// }
//
// if (src != dest) {
// post_file_copies.insert(std::make_tuple(output_file, src, dest));
// }
// file_locations.insert(std::make_pair(output_file, dest));
// }
//
// // creating and submitting job
// job = (WorkflowJob *) this->getJobManager()->createStandardJob({task},
// file_locations,
// pre_file_copies,
// post_file_copies, {});
//
// } else { // regular compute task
//
// // input/output files
// std::map<WorkflowFile *, StorageService *> file_locations;
// for (auto f : task->getInputFiles()) {
// file_locations.insert(std::make_pair(f, htcondor_service->getLocalStorageService()));
// }
// for (auto f : task->getOutputFiles()) {
// file_locations.insert(std::make_pair(f, htcondor_service->getLocalStorageService()));
// }
//

// check whether files need to be staged in
for (auto file : task->getInputFiles()) {
auto file_locations = this->file_registry_service->lookupEntry(file);
bool needs_transfer = true;
for (auto l : file_locations) {
if (l->getStorageService() == htcondor_service->getLocalStorageService()) {
needs_transfer = false;
break;
}
}
if (needs_transfer) {
// Create a data movement manager
this->getDataMovementManager()->doSynchronousFileCopy(file, *file_locations.begin(),
FileLocation::LOCATION(
htcondor_service->getLocalStorageService(),
"/"));
std::cerr << "ANALYZING FILE: " << file->getID() << std::endl;
if (not htcondor_service->getLocalStorageService()->lookupFile(
file, FileLocation::LOCATION(htcondor_service->getLocalStorageService(), "/"))) {

auto file_locations = this->file_registry_service->lookupEntry(file);
this->getDataMovementManager()->doSynchronousFileCopy(
file, *file_locations.begin(),
FileLocation::LOCATION(htcondor_service->getLocalStorageService(), "/"));
}
}

Expand All @@ -158,7 +80,6 @@ namespace wrench {
// creating job for execution
job = (WorkflowJob *) this->getJobManager()->createStandardJob(task, file_locations);

// }
WRENCH_INFO("Scheduling task: %s", task->getID().c_str());
this->getJobManager()->submitJob(job, htcondor_service);
// create job scheduled event
Expand Down
29 changes: 0 additions & 29 deletions src/PegasusRun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,40 +94,11 @@ int main(int argc, char **argv) {
std::map<std::string, wrench::WorkflowFile *> input_files = workflow->getInputFiles();
std::map<std::string, std::shared_ptr<wrench::StorageService>> storage_services = config.getStorageServicesMap();

// int num_transfer_tasks = 0;
//
// for (auto task : workflow->getTasks()) {
// if (task->getTaskType() == wrench::WorkflowTask::TaskType::TRANSFER) {
// num_transfer_tasks++;
// for (auto file_transfer : task->getFileTransfers()) {
// if (not file_transfer.first->isOutput()) {
// if (file_transfer.second.first == "local") {
// simulation.stageFile(file_transfer.first, htcondor_service->getLocalStorageService());
// } else {
// simulation.stageFile(file_transfer.first, storage_services.at(file_transfer.second.first));
// }
// }
// }
// }
// }
//
// wrench::WorkflowTask *stagein_task = workflow->addTask("STAGE_IN", 0, 1, 1, 1.0, 0.0);

// if (num_transfer_tasks == 0) {
// handle the XML import case where there are no transfer tasks
for (auto file : input_files) {
for (auto storage_service : storage_services) {
simulation.stageFile(file.second, storage_service.second);
}
// simulation.stageFile(file.second, htcondor_service->getLocalStorageService());
// stagein_task->addInputFile(file.second);
}
// }

// // adding dependency between data stage in task and the first tasks in the workflow
// for (auto task : workflow->getReadyTasks()) {
// workflow->addControlDependency(stagein_task, task);
// }

// simulation execution
WRENCH_INFO("Launching the Simulation...");
Expand Down

0 comments on commit d9c2935

Please sign in to comment.