-
-
Notifications
You must be signed in to change notification settings - Fork 496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
distributed.py - definitely needs update from stopevent; having headaches coding #101
Comments
Hi @drallensmith, sorry for causing you some headaches.
We could try to request the value manually every time the
primary/secondary is definitely better than the master/slave (And even better than server/client, which i used before master/slave). Regarding #51: By now i got a small ARM-cluster for testing, but setting it up will take some more time. |
Forking it sounds fine, and no problem re the headaches - thanks for your contribution! At this point I am actually more concerned re the timeouts and similar errors than pickling - with the exception of the one due to self.started not being set to False by de.stop, they come prior to any attempt to "resurrect" the code. It may be most informative to you to try to run the test_xor_example_distributed.py file with a mostly-unmodified (just change the terminology) distributed.py version (oops on my part for not cleanly commiting just the terminology change - the commit before the most recent two, starting with 42c878, may be most useful). I'm glad you like the primary/secondary - it isn't quite server/client either, since the secondary nodes aren't making requests of the primary node. The ARM cluster is good news; @D0pa might also be able to help on that. |
I want to contribute but fail to understand fallowing: Additionally I see biggest advantage in using MPI in future is because of Xeon PHI cards, that hopefully will be more affordable. |
I understand your question, and it's a good one - see the comment I just made on #51 for some thoughts on the matter. |
@bennr01 - I have just uploaded a new commit to my fork's config_work branch; I went back to using Value for a However, test_distributed.py now either hangs on trying to join the single-worker secondary node (if not trying the multiple-worker version), or the child evaluation process (multiple workers) gets an EOFError from the inqueue being closed. (The latter may be fixable by "telling" it this is the equivalent of do_stop being turned on... I am admittedly a bit puzzled by why the code currently treats managers.RemoteError as if it were queue.Empty - this makes sense for the primary, but I'm not sure if it does for the secondaries.) |
@D0pa The main advantage of using The reason i chose to use Thanks for offering your help in both this issue and in #51. My cluster is actually already setup, the only issue is that @drallensmith The real manager and all objects it holds are stored in a seperate child process (according to the documentation of |
Good point regarding the RemoteError. Perhaps Regarding putting back the stop value, since there is also code for the primary node to put it onto the queue again if it's empty, putting back the stop value could be deleted to check. |
OK, I implemented the first of the above. test_distributed.py is generally working... and I realized a problem with restoring from a checkpoint the way I was trying in test_xor_example_distributed.py - in the current setup, while run_primary has the information needed (such as the generation of the last checkpoint) to restore, it doesn't have the information on how to recreate the secondary processes/nodes. Without those, if you try to restart it, it hangs. I'm currently trying seeing if exit_on_stop=False will help. |
No, still hangs. After travis gets through running all the tests except test_xor_example_distributed.py, I'll upload a version with that test so that people can explore the problem more. BTW, I changed the Queue to use multiprocessing.Queue - queue.Queue, as it turns out, is not multiprocess safe. There's also a |
Huh. travis is having a problem with anything above Python 3.5 - authentication errors. It's also claiming that manager in manager.RemoteError is not defined... ah; misspelling, should be managers. Fixing. |
Currently, anything above 3.5 (3.6, etc) is timing out, and when the process is interrupted, it says it was having an AuthenticationError. If that would come under RemoteError, then the primary node would ignore it, starting a loop... but why didn't the secondary nodes exit? Or did they? |
I've gone ahead and uploaded a version with test_xor_example_distributed.py, while letting the other travis build run out. I also added |
@drallensmith the |
Good thinking. Of course, at least part of the question then would be "what changed with 3.6+ to give this error when it didn't happen before". Perhaps s.recv_bytes() changed? |
Huh. Looks like pypy, as well as following the 2.7.13 version of the language, is following it in networking internals also - it did not error out for test_distributed.py. Ditto for pypy3 following 3.5.3 in networking. Looks like it's python code, not C/C++ internal code differences. |
Well, I'm no longer seeing AuthenticationErrors from the secondaries - but, as far as I can tell, the primaries are still hanging. Sigh... see here for the travis build. |
At least one question that needs solving is: What should the primary node do if there's an error that likely indicates the secondaries are out of contact? In particular, if they are out of contact (pipe for queue broken, socket closed, whatever) but are actually still running (exit_on_stop=False, for instance)? Would it work to re-get the queue in question from the manager? |
I think the behavior of the primary node should depend on whether the error is recoverable or not. If the secondary nodes are offline, but the evaluation could continue if a new secondary connects to the primary node, then the error should probably simply ignored (Sometimes, some nodes of a cluster may reboot, especially if the cluster relocate processes according to the usage of the nodes).
It seems like a re-getting the queue from the manager would create a new connection, so the new queue would be connected again. However, any items consumed from the first queue but not processed may result in an frozen primary node (if i recall correctly, the primary node keeps track of the number of genomes it still has to wait for, so a missing item would result in an infinite loop). I just had an idea about the problem with the stopevent. Your idea for using a queue for checking if the secondary nodes should stop is fine, but why do we actually
Have you checked if these errors also appear outside of travis? |
Regarding being able to restart the evaluation, or at least the population - that's pretty much exactly what I was testing for with checkpointing; it does work with non-distributed NEAT currently, and I suspect that anyone wanting to do (serious) distributed computing is going to have a big enough task at hand that the loss due to not checkpointing will be much more unacceptable. If the primary detects that it needs to reinitialize the connections to the secondaries, then it should probably put any genomes that it sent out and didn't get back fitnesses for in a pool that, once other genomes are done, is sent out again if it hasn't received fitnesses for them by then. Alternatively or as well, if the secondaries keep track of what fitness information they've gathered so far and don't automatically completely shut down or otherwise become unavailable, some mechanism to inquire for this (or even simply having them send back a pickled dict of genome id vs fitness as the first thing in the queue going back to the primary node, once a connection was re-established; In regard to why not always stop the secondary process, several thoughts:
The stopevent idea should work - good thought! The queues are what's currently working the best, as far as I can tell... urr... but that's with queues that are continuously going. It may be hard for it to tell the difference between an empty queue and a broken queue. Hmm... currently, one difficulty is that the primary doesn't actually know how many secondaries it is supposed to have available. If it had that information, it could just stick a series of "stop" tokens on the queue going to the secondaries and wait to receive an equal number of acknowledging "I'm stopping" tokens on the return queue. [Admittedly, some of the secondaries but not all of them going down could be a difficulty with this... hmm... I do note that the multiprocessing queue implementation, unlike the queue.Queue one, has the capability of a "close" command. Perhaps the primary could simply keep sending stop tokens until all the secondaries that were connected had closed their connections?] The errors (as in hangs) with the test_xor_example_distributed.py test script definitely happen outside of travis. I don't know about the hangs in test_distributed.py itself, which are only happening in Python 3.6+; locally I have set up 2.7.13, 3.5.3, pypy2-5.8.0, and pypy3-5.8.0 (although I usually only test locally with 2.7.13 and 3.5.3). I'm a bit low on disk & other space, but can try to set up 3.6.X in addition locally... but even if that part's a travis bug interacting with 3.6+ python, that still leaves the test_xor_example_distributed.py failures (in restarting after a checkpoint). Being able to restart after a checkpoint also tests the resilience of the connection/reconnection code about as well as could be done without using multiple machines (as in the best we can do with readily-available automatable testing, as far as I can tell). Actual testing with more than one machine is also needed, however, to make sure the underlying code isn't taking shortcuts enabled by being on the same machine (e.g., queue.Queue is reliant on this, as far as I can tell). |
@drallensmith i have pushed an updated version of 'distributed.py' (based on your changes in 'config_work') to the 'distributed_rewrite' branch in my fork. It is still not finished, but here are a few of the changes:
Good point. |
Definite progress; the EOFErrors (and similar) need some work, but still definite progress. I wasn't getting anywhere but frustrated, so - after working on some non-online matters - decided to work on some other aspects for a bit; I've used mypy to help me check my multiparameter function code + the config_work documentation, and have indeed found some places needing changes. |
I think I may have managed to make distributed.py a bit more reliable and faster to test. I was working on other things and being hindered by how long the various distributed.py tests were taking (and by the intermittent failures), so investigated further using I noticed that in all cases I was seeing, the common problem was a built-in timeout in connecting for updating the state - not in connecting for a queue. One can control the maximum time spent on trying to retrieve from (or put into) a queue, and keep trying again until it works; there is no such timeout control for various other proxy objects. I therefore tried out a combination of the "put a stop token on the queue" idea from above with some further checks on how long things are taking, most particularly how long it's been since the secondary processes/threads have successfully gotten a job or transmitted the results back. @bennr01, could you and @D0pa please take a look at the code? It is in my fork's master branch. Quite a bit of it is unfortunately heuristic guesses as to good timeouts and other variables, albeit with the possibility of specifying some of them by the caller; it is also rather complex code (McCabe cyclomatic complexity score of 33...). I am certain a lot of improvements are possible - for instance: sending some sort of "keepalive" token to the secondaries if it is desired to use them again; having an option for the user to tell the primary process/thread how many secondaries there are, and otherwise having more control over how many times the primary sends out a stop token; the idea I mentioned above regarding caching results in case of queue data loss; the secondaries, if they were told at the start of each generation + have some idea of how many genomes they were expected to process, taking this into account in deciding whether there had been too long since a contact. It also badly needs testing in an actual multi-machine situation. I am also going to investigate if pypy/pypy3 can be gotten to work on the non-threading versions - I think I'll fork off a separate branch for that testing. |
Interesting... Thanks for investigating this.
It seems to me that we just handle the errors better for the queues than we do for the
I thought that the queue timeout would behave like the normal timeout for
The intended behavior for the reconnection and re-usability is to always reconnect if Sigh... Maybe we should stop using multiprocessing and rewrite BTW, in |
Quite welcome. I may have gotten pypy to work for non-threading tests. I am suspecting, but I definitely don't know for sure, that the queue timeout can, at least sometimes, work for more than queue empty because it's a timeout on recv or sndv, as appropriate. Admittedly, that would not explain doing better for connect, but that may also be a matter of proxied booleans reconnecting (as in redoing I have not yet succeeded in, if the primary manager shuts down, getting the secondaries to reconnect. I'm currently trying out essentially using whether _reset_em() is successful as a test - that doesn't require that there is anything in the queues, but from observation of some errors does require connect to work. Good question re sockets; I wouldn't do that quite yet - Thank you for pointing that out on |
Sigh... looks like pypy is still erratic. I may try to set up a .travis.yml file such that pypy + pypy3 are used twice - once without distributed tests, once for just those but allowed to fail - in order to get more data on what's happening. |
@bennr01: I wanted to let you (and others) know that I tried making the queue.Queues multiprocessing.Queues instead, with close being called at appropriate points, and apparently managed to find an internal Python bug, causing mysterious AssertionErrors in the multiprocessing module. (At the minimum, it's a bug that it's not more informative than "AssertionError"...) See the multiprocessing_internal_error branch in my fork. I've put in a bug report at bugs.python.org. |
@bennr01: Hi. I was wanting to check to make sure that distributed evaluation worked with checkpointing (classes that aren't built at the top level of a file can be problematic for that, such as the _EvaluatorSyncManager, being one reason for my concern), so put together a test_xor_example_distributed.py test case, which you can see in my fork's config_work branch. However, with no alterations to distributed.py, it usually had a timeout error while waiting for the stopevent to connect. The one time it didn't, it thought the DistributedEvaluator was still running (since de.started was not reset to False by de.stop()), and so errored out anyway. The fix for the latter error is easy; the former, assuming it's a problem with using an event instead of a Value, is another matter.
I've been trying to make the changeover to stopevent but am having some major headaches getting a proxy value distributed between multiple processes that may be on different machines (my initial try, putting a Value from syncmanager into the queue as the first entry, would have worked... except that other secondary processes didn't get the Value! Oops....). Any suggestions? Using namespace isn't working, which was my most recent try, replacing Value.
I also changed the "master/slave" terminology to "primary/secondary", which is more modern usage.
The text was updated successfully, but these errors were encountered: