Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global phase staggering #514

Open
wants to merge 1 commit into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/plotman/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ def get_dst_directories(self):
class Scheduling:
global_max_jobs: int
global_stagger_m: int
global_stagger_phase_major: int
global_stagger_phase_minor: int
global_stagger_phase_limit: int
polling_time_s: int
tmpdir_max_jobs: int
tmpdir_stagger_phase_major: int
Expand Down
177 changes: 92 additions & 85 deletions src/plotman/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,95 +84,102 @@ def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg):
elif len(jobs) >= sched_cfg.global_max_jobs:
wait_reason = 'max jobs (%d) - (%ds/%ds)' % (sched_cfg.global_max_jobs, youngest_job_age, global_stagger)
else:
tmp_to_all_phases = [(d, job.job_phases_for_tmpdir(d, jobs)) for d in dir_cfg.tmp]
eligible = [ (d, phases) for (d, phases) in tmp_to_all_phases
if phases_permit_new_job(phases, d, sched_cfg, dir_cfg) ]
rankable = [ (d, phases[0]) if phases else (d, job.Phase(known=False))
for (d, phases) in eligible ]

if not eligible:
wait_reason = 'no eligible tempdirs (%ds/%ds)' % (youngest_job_age, global_stagger)
milestone = job.Phase(
major=sched_cfg.tmpdir_stagger_phase_major,
minor=sched_cfg.tmpdir_stagger_phase_minor,
)
if len([j for j in jobs if j.progress() < milestone]) >= sched_cfg.global_stagger_phase_limit:
wait_reason = 'max jobs (%d) before phase [%d : %d] - (%ds/%ds)' % (sched_cfg.global_stagger_phase_limit, sched_cfg.global_stagger_phase_major, sched_cfg.global_stagger_phase_minor, youngest_job_age, global_stagger)
else:
# Plot to oldest tmpdir.
tmpdir = max(rankable, key=operator.itemgetter(1))[0]

# Select the dst dir least recently selected
dst_dir = dir_cfg.get_dst_directories()
if dir_cfg.dst_is_tmp():
dstdir = tmpdir
tmp_to_all_phases = [(d, job.job_phases_for_tmpdir(d, jobs)) for d in dir_cfg.tmp]
eligible = [ (d, phases) for (d, phases) in tmp_to_all_phases
if phases_permit_new_job(phases, d, sched_cfg, dir_cfg) ]
rankable = [ (d, phases[0]) if phases else (d, job.Phase(known=False))
for (d, phases) in eligible ]

if not eligible:
wait_reason = 'no eligible tempdirs (%ds/%ds)' % (youngest_job_age, global_stagger)
else:
dir2ph = { d:ph for (d, ph) in dstdirs_to_youngest_phase(jobs).items()
if d in dst_dir and ph is not None}
unused_dirs = [d for d in dst_dir if d not in dir2ph.keys()]
dstdir = ''
if unused_dirs:
dstdir = random.choice(unused_dirs)
# Plot to oldest tmpdir.
tmpdir = max(rankable, key=operator.itemgetter(1))[0]

# Select the dst dir least recently selected
dst_dir = dir_cfg.get_dst_directories()
if dir_cfg.dst_is_tmp():
dstdir = tmpdir
else:
dstdir = max(dir2ph, key=dir2ph.get)

logfile = os.path.join(
dir_cfg.log, pendulum.now().isoformat(timespec='microseconds').replace(':', '_') + '.log'
)

plot_args = ['chia', 'plots', 'create',
'-k', str(plotting_cfg.k),
'-r', str(plotting_cfg.n_threads),
'-u', str(plotting_cfg.n_buckets),
'-b', str(plotting_cfg.job_buffer),
'-t', tmpdir,
'-d', dstdir ]
if plotting_cfg.e:
plot_args.append('-e')
if plotting_cfg.farmer_pk is not None:
plot_args.append('-f')
plot_args.append(plotting_cfg.farmer_pk)
if plotting_cfg.pool_pk is not None:
plot_args.append('-p')
plot_args.append(plotting_cfg.pool_pk)
if dir_cfg.tmp2 is not None:
plot_args.append('-2')
plot_args.append(dir_cfg.tmp2)
if plotting_cfg.x:
plot_args.append('-x')

logmsg = ('Starting plot job: %s ; logging to %s' % (' '.join(plot_args), logfile))

try:
open_log_file = open(logfile, 'x')
except FileExistsError:
# The desired log file name already exists. Most likely another
# plotman process already launched a new process in response to
# the same scenario that triggered us. Let's at least not
# confuse things further by having two plotting processes
# logging to the same file. If we really should launch another
# plotting process, we'll get it at the next check cycle anyways.
message = (
f'Plot log file already exists, skipping attempt to start a'
f' new plot: {logfile!r}'
dir2ph = { d:ph for (d, ph) in dstdirs_to_youngest_phase(jobs).items()
if d in dst_dir and ph is not None}
unused_dirs = [d for d in dst_dir if d not in dir2ph.keys()]
dstdir = ''
if unused_dirs:
dstdir = random.choice(unused_dirs)
else:
dstdir = max(dir2ph, key=dir2ph.get)

logfile = os.path.join(
dir_cfg.log, pendulum.now().isoformat(timespec='microseconds').replace(':', '_') + '.log'
)
return (False, logmsg)
except FileNotFoundError as e:
message = (
f'Unable to open log file. Verify that the directory exists'
f' and has proper write permissions: {logfile!r}'
)
raise Exception(message) from e

# Preferably, do not add any code between the try block above
# and the with block below. IOW, this space intentionally left
# blank... As is, this provides a good chance that our handle
# of the log file will get closed explicitly while still
# allowing handling of just the log file opening error.

with open_log_file:
# start_new_sessions to make the job independent of this controlling tty.
p = subprocess.Popen(plot_args,
stdout=open_log_file,
stderr=subprocess.STDOUT,
start_new_session=True)

psutil.Process(p.pid).nice(15)
return (True, logmsg)

plot_args = ['chia', 'plots', 'create',
'-k', str(plotting_cfg.k),
'-r', str(plotting_cfg.n_threads),
'-u', str(plotting_cfg.n_buckets),
'-b', str(plotting_cfg.job_buffer),
'-t', tmpdir,
'-d', dstdir ]
if plotting_cfg.e:
plot_args.append('-e')
if plotting_cfg.farmer_pk is not None:
plot_args.append('-f')
plot_args.append(plotting_cfg.farmer_pk)
if plotting_cfg.pool_pk is not None:
plot_args.append('-p')
plot_args.append(plotting_cfg.pool_pk)
if dir_cfg.tmp2 is not None:
plot_args.append('-2')
plot_args.append(dir_cfg.tmp2)
if plotting_cfg.x:
plot_args.append('-x')

logmsg = ('Starting plot job: %s ; logging to %s' % (' '.join(plot_args), logfile))

try:
open_log_file = open(logfile, 'x')
except FileExistsError:
# The desired log file name already exists. Most likely another
# plotman process already launched a new process in response to
# the same scenario that triggered us. Let's at least not
# confuse things further by having two plotting processes
# logging to the same file. If we really should launch another
# plotting process, we'll get it at the next check cycle anyways.
message = (
f'Plot log file already exists, skipping attempt to start a'
f' new plot: {logfile!r}'
)
return (False, logmsg)
except FileNotFoundError as e:
message = (
f'Unable to open log file. Verify that the directory exists'
f' and has proper write permissions: {logfile!r}'
)
raise Exception(message) from e

# Preferably, do not add any code between the try block above
# and the with block below. IOW, this space intentionally left
# blank... As is, this provides a good chance that our handle
# of the log file will get closed explicitly while still
# allowing handling of just the log file opening error.

with open_log_file:
# start_new_sessions to make the job independent of this controlling tty.
p = subprocess.Popen(plot_args,
stdout=open_log_file,
stderr=subprocess.STDOUT,
start_new_session=True)

psutil.Process(p.pid).nice(15)
return (True, logmsg)

return (False, wait_reason)

Expand Down
7 changes: 7 additions & 0 deletions src/plotman/resources/plotman.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ scheduling:
# Don't run more than this many jobs at a time in total.
global_max_jobs: 12

# You can also limit the number of jobs by phase globally. This is useful
# to limit memory or CPU core use. These values allow only three jobs in
# phase 1 across all temp dirs.
global_stagger_phase_major: 2
global_stagger_phase_minor: 1
global_stagger_phase_limit: 3

# Don't run any jobs (across all temp dirs) more often than this, in minutes.
global_stagger_m: 30

Expand Down