Skip to content

Commit

Permalink
Add worker classes
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathonmcmurray committed Feb 23, 2019
1 parent 7555baf commit 8d109a1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 28 deletions.
2 changes: 1 addition & 1 deletion worker/register.q
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
\d .worker

master:hopen `$.z.x 0;
master(`.worker.register;.z.h;.z.i);
master(`.worker.register;.z.h;.z.i;`$.z.x 1);

out:{master(1;x)}

Expand Down
58 changes: 31 additions & 27 deletions worker/worker.q
Original file line number Diff line number Diff line change
@@ -1,45 +1,49 @@
\d .worker

qline:"q $QPATH/worker/register.q ",(":"sv("";string .z.h;string system"p"))," &"; //q start-up line
preline:""; //placeholder for any preamble required
qline:"q $QPATH/worker/register.q ",(":"sv("";string .z.h;string system"p")); //q start-up line
preline:""; //placeholder for any preamble required

sline:{$[count preline;preline," && ";""],qline} //build startup line
sline:{[c]$[count preline;preline," && ";""],qline," ",string[c]," &"} //build startup line

create:{system sline[]} //create local worker
rcreate:{[r]system"ssh ",r," '",sline[],"' &"} //create remote worker given remote name & q path
create:{[c]c:$[c~(::);`;c];system sline c} //create local worker (c:class)
rcreate:{[c;r]system"ssh ",r," '",sline[c],"' &"} //create remote worker given remote name & q path

workers:([] host:();handle:();pid:())
workers:([]host:();handle:();pid:();class:())

preset:() //queue for variables to set
prerun:() //queue for commands to run
prereq:() //queue for packages to require
preset:()!() //queue for variables to set
prerun:()!() //queue for commands to run
prereq:()!() //queue for packages to require

.worker.set:{[x;y] /x:variable;y:value
if[count .worker.workers;
.worker.workers[`handle]@\:(set;x;y)]; //set on existant workers
preset,:enlist(x;y); //queue for future workers
.worker.set:{[c;x;y] /c:class,x:variable;y:value
wk:$[c=`;.worker.workers;select from .worker.workers where class in c];
if[count wk;
wk[`handle]@\:(set;x;y)]; //set on existant workers
preset[c],:enlist(x;y); //queue for future workers
}

.worker.run:{[x] /x:string to run
if[count .worker.workers;
.worker.workers[`handle]@\:(value;x)]; //run on existant workers
prerun,:enlist x; //queue for future workers
.worker.run:{[c;x] /c:class,x:string to run
wk:$[c=`;.worker.workers;select from .worker.workers where class in c];
if[count wk;
wk[`handle]@\:(value;x)]; //run on existant workers
prerun[c],:enlist x; //queue for future workers
}

.worker.require:{[x] /x:package
if[count .worker.workers;
.worker.workers[`handle]@\:(`.utl.require;x)]; //require on existant workers
prereq,:enlist x; //queue for future workers
.worker.require:{[c;x] /c:class,x:package
wk:$[c=`;.worker.workers;select from .worker.workers where class in c];
if[count wk;
wk[`handle]@\:(`.utl.require;x)]; //require on existant workers
prereq[c],:enlist x; //queue for future workers
}

register:{
`.worker.workers upsert (x;.z.w;y);
if[count preset;neg[.z.w](set'),flip preset]; //set queued variables
if[count prerun;neg[.z.w](value'),enlist prerun]; //run queued commands
if[count prereq;neg[.z.w](`.utl.require'),enlist prereq]; //req queued packages
`.worker.workers upsert (x;.z.w;y;z);
f:{x where 0<count'[x]};
if[count preset;neg[.z.w](set'),flip raze preset`,z]; //set queued variables
if[count prerun;neg[.z.w](value'),enlist f raze prerun`,z]; //run queued commands
if[count prereq;neg[.z.w](`.utl.require'),enlist f raze prereq`,z]; //req queued packages
}

\d .

.z.exit:{[x;y]neg[.worker.workers[`handle]]@\:"exit 0";x[]}@[value;`.z.exit;{{}}]; //close all workers on exit, maintain existing .z.exit
.z.pc:{x y;delete from `.worker.workers where handle=y}@[value;`.z.pc;{{}}]; //maintain existing .z.pc & delete from .worker.workers
.z.exit:{[x;y]neg[.worker.workers[`handle]]@\:"exit 0";x[]}@[value;`.z.exit;{{}}]; //close all workers on exit, maintain existing .z.exit
.z.pc:{x y;delete from `.worker.workers where handle=y}@[value;`.z.pc;{{}}]; //maintain existing .z.pc & delete from .worker.workers

0 comments on commit 8d109a1

Please sign in to comment.