Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributed.py - definitely needs update from stopevent; having headaches coding #101

Open
drallensmith opened this issue Jul 23, 2017 · 26 comments · May be fixed by #125
Open

distributed.py - definitely needs update from stopevent; having headaches coding #101

drallensmith opened this issue Jul 23, 2017 · 26 comments · May be fixed by #125

Comments

@drallensmith
Copy link
Contributor

drallensmith commented Jul 23, 2017

@bennr01: Hi. I was wanting to check to make sure that distributed evaluation worked with checkpointing (classes that aren't built at the top level of a file can be problematic for that, such as the _EvaluatorSyncManager, being one reason for my concern), so put together a test_xor_example_distributed.py test case, which you can see in my fork's config_work branch. However, with no alterations to distributed.py, it usually had a timeout error while waiting for the stopevent to connect. The one time it didn't, it thought the DistributedEvaluator was still running (since de.started was not reset to False by de.stop()), and so errored out anyway. The fix for the latter error is easy; the former, assuming it's a problem with using an event instead of a Value, is another matter.

I've been trying to make the changeover to stopevent but am having some major headaches getting a proxy value distributed between multiple processes that may be on different machines (my initial try, putting a Value from syncmanager into the queue as the first entry, would have worked... except that other secondary processes didn't get the Value! Oops....). Any suggestions? Using namespace isn't working, which was my most recent try, replacing Value.

I also changed the "master/slave" terminology to "primary/secondary", which is more modern usage.

@bennr01
Copy link
Contributor

bennr01 commented Jul 23, 2017

Hi @drallensmith, sorry for causing you some headaches.
I think most of the issues can be fixed by moving the _EvaluatorSyncManager in the module namespace and also define another class, which interacts with the _EvaluatorSyncManager. The reason _EvaluatorSyncManager is currently defined in _start_* is because it needs access to a namespace which is not cleared when the method returns. Using another class for interacting with the _EvaluatorSyncManager should allow us to use the namespace/attributes of that class instead of the namespace/attributes of the DistributedEvaluator. If this class defines __reduce__, it should be possible to instruct pickle to create a new manager upon checkpoint loading instead of trying to load the old manager.

I've been trying to make the changeover to stopevent but am having some major headaches getting a proxy value distributed between multiple processes that may be on different machines (my initial try, putting a Value from syncmanager into the queue as the first entry, would have worked... except that other secondary processes didn't get the Value! Oops....). Any suggestions? Using namespace isn't working.

We could try to request the value manually every time the _secondary_loop checks it.
I can try to do the changes. Should i fork your config_work branch and create a PR to your fork when/if i am successful? This would reduce the number of pull-requests and prevent another merge conflict.

I also changed the "master/slave" terminology to "primary/secondary", which is more modern usage.

primary/secondary is definitely better than the master/slave (And even better than server/client, which i used before master/slave).

Regarding #51: By now i got a small ARM-cluster for testing, but setting it up will take some more time.

@drallensmith
Copy link
Contributor Author

drallensmith commented Jul 23, 2017

Forking it sounds fine, and no problem re the headaches - thanks for your contribution!

At this point I am actually more concerned re the timeouts and similar errors than pickling - with the exception of the one due to self.started not being set to False by de.stop, they come prior to any attempt to "resurrect" the code. It may be most informative to you to try to run the test_xor_example_distributed.py file with a mostly-unmodified (just change the terminology) distributed.py version (oops on my part for not cleanly commiting just the terminology change - the commit before the most recent two, starting with 42c878, may be most useful).

I'm glad you like the primary/secondary - it isn't quite server/client either, since the secondary nodes aren't making requests of the primary node. The ARM cluster is good news; @D0pa might also be able to help on that.

@evolvingfridge
Copy link
Contributor

I want to contribute but fail to understand fallowing:
What is advantage and purpose of using python multi-processing when experiment requires large scale simulation, when MPI solves this issue much more easier (at least for me) and allows multiple setups such as micro services environment or distributed population ?

Additionally I see biggest advantage in using MPI in future is because of Xeon PHI cards, that hopefully will be more affordable.

@drallensmith
Copy link
Contributor Author

I understand your question, and it's a good one - see the comment I just made on #51 for some thoughts on the matter.

@drallensmith
Copy link
Contributor Author

drallensmith commented Jul 24, 2017

@bennr01 - I have just uploaded a new commit to my fork's config_work branch; I went back to using Value for a do_stop proxy, this time establishing a separate queue for it to get to the secondaries (and be returned by them), plus using dict to figure out that one has to check the do_stop._value attribute for the secondaries (for the primary, just set do_stop to True).

However, test_distributed.py now either hangs on trying to join the single-worker secondary node (if not trying the multiple-worker version), or the child evaluation process (multiple workers) gets an EOFError from the inqueue being closed. (The latter may be fixable by "telling" it this is the equivalent of do_stop being turned on... I am admittedly a bit puzzled by why the code currently treats managers.RemoteError as if it were queue.Empty - this makes sense for the primary, but I'm not sure if it does for the secondaries.)

@bennr01
Copy link
Contributor

bennr01 commented Jul 24, 2017

@D0pa The main advantage of using multiprocessing instead of mpi4py is that multiprocessing does not need any other dependencies. Most systems which have python installed also have the multiprocessing module available. mpi4py can easily be installed using pip, but it seems like it needs some dependencies (libmpich-dev). I heard that some of the universities make it rather hard to get the root access which may be required to install these dependencies.
The second advantage is that it seems like mpi4py requires more code. When comparing evolve-feedforward-distributed.py and xor-feedforward-mpi-data-scatter.py (from https://github.com/CodeReclaimers/neat-python/pull/72/files), evolve-feedforard-distributed.py has 85 lines of code less than xor-feedforward-mpi-data-scatter.py (however, xor-feedforward-mpi-data-scatter.py has a longer docstring and evolve-feedforward-distributed.py commandline argument support for testing purposes, so this is not an ideal measurement.).
I would like to say that the third advantage is the easier integration into existing code, but this varies between users. Adding support for clusters using distributed requires 3 additional lines of code and a modification to the line containing the population.run call. However, if someone writes an evaluation function which actually requires being run on a cluster, that person would have written his/her code with this in mind, so this is not really an advantage. Most people using clusters will have more experience with mpi4py than with distributed/multiprocessing, which makes it easier for them to use mpi4py. neat.distributed was written for inexperienced people who need to run some evaluations on a cluster without much technical knowledge.
Obviously, there are also some disadvantages of using multiprocessing instead of mpi4py. One being the Xeon PHI cards (and other technology requiring mpi). The probably biggest disadvantage is the performance. I have not measured it, but mpi is probably faster than multiprocessing.managers. But if data transfer rate between the nodes really matters in comparsion with the evaluation time, then the evaluation would probably be faster when using neat.parallel instead of a cluster, wouldn't it?

The reason i chose to use multiprocessing.managers instead of mpi4py was not related to the advantages and disadvantages. When i looked for a way to run neat-python on a cluster, i stumbled upon your PR #72. The PR was closed and not merged, so i thought there was some reason it was not merged. I do not understand most of the MPI code in in your MPI example, so i wanted to either try to split most of the code into a separate module for easier re-usability or create a completely new module. Like i said, because the PR was closed, i thought that there was some reason it was not merged and concluded that using mpi4py may not be the best idea, so i did not even try it.

Thanks for offering your help in both this issue and in #51. My cluster is actually already setup, the only issue is that pip install requires more time than it should.

@drallensmith The real manager and all objects it holds are stored in a seperate child process (according to the documentation of multiprocessing). Due to this, both the primary and secondary nodes are client connected to the process holding the manager. When we try to get an element from the queue, the manager process will request the element. If the queue raises queue.Empty, the exception is caught and serialized. The client which requested the element creates a new Exception from the serialized data. This new Exception may not be of the same type as the original exception, but instead a RemoteError containing the error message and the traceback.
I just took a look at your newest commit to config_work. I am not sure why the code blocks, but the secondary nodes first receive the stop value and then put it back into the queue. Maybe this leads to some problems, as the stop value put back into the queue may not the same the primary node put into the queue.

@drallensmith
Copy link
Contributor Author

Good point regarding the RemoteError. Perhaps except managers.RemoteError as e: followed by checking repr(e) (or the appropriate attribute of e) for the string Empty, and also for EOFError; the latter should result in the equivalent of a stop event (and ditto if an actual EOFError is received by a secondary). I will try to work on this later today, but am not sure when I will be able to get to it.

Regarding putting back the stop value, since there is also code for the primary node to put it onto the queue again if it's empty, putting back the stop value could be deleted to check.

@drallensmith
Copy link
Contributor Author

OK, I implemented the first of the above. test_distributed.py is generally working... and I realized a problem with restoring from a checkpoint the way I was trying in test_xor_example_distributed.py - in the current setup, while run_primary has the information needed (such as the generation of the last checkpoint) to restore, it doesn't have the information on how to recreate the secondary processes/nodes. Without those, if you try to restart it, it hangs. I'm currently trying seeing if exit_on_stop=False will help.

@drallensmith
Copy link
Contributor Author

drallensmith commented Jul 24, 2017

No, still hangs. After travis gets through running all the tests except test_xor_example_distributed.py, I'll upload a version with that test so that people can explore the problem more.

BTW, I changed the Queue to use multiprocessing.Queue - queue.Queue, as it turns out, is not multiprocess safe. There's also a close() method for the multiprocessing version which may be needed.

@drallensmith
Copy link
Contributor Author

drallensmith commented Jul 24, 2017

Huh. travis is having a problem with anything above Python 3.5 - authentication errors. It's also claiming that manager in manager.RemoteError is not defined... ah; misspelling, should be managers. Fixing.

@drallensmith
Copy link
Contributor Author

Currently, anything above 3.5 (3.6, etc) is timing out, and when the process is interrupted, it says it was having an AuthenticationError. If that would come under RemoteError, then the primary node would ignore it, starting a loop... but why didn't the secondary nodes exit? Or did they?

@drallensmith
Copy link
Contributor Author

I've gone ahead and uploaded a version with test_xor_example_distributed.py, while letting the other travis build run out. I also added -v to nosetests so we can see a bit more details.

@bennr01
Copy link
Contributor

bennr01 commented Jul 24, 2017

@drallensmith the digest sent was rejected error is raised when connection.recv_bytes(1024) != b"#WELCOME#". While connection.recv_bytes() should normally return the response, default socket behavior is to return an empty string when the connection is closed. If s.recv_bytes() uses socket.socket().recv(), the AuthenticationError could be explained with a closed connection. I have to check the source of connection.recv_bytes() to check this.

@drallensmith
Copy link
Contributor Author

Good thinking. Of course, at least part of the question then would be "what changed with 3.6+ to give this error when it didn't happen before". Perhaps s.recv_bytes() changed?

@drallensmith
Copy link
Contributor Author

drallensmith commented Jul 24, 2017

Huh. Looks like pypy, as well as following the 2.7.13 version of the language, is following it in networking internals also - it did not error out for test_distributed.py.

Ditto for pypy3 following 3.5.3 in networking. Looks like it's python code, not C/C++ internal code differences.

@drallensmith
Copy link
Contributor Author

drallensmith commented Jul 24, 2017

Well, I'm no longer seeing AuthenticationErrors from the secondaries - but, as far as I can tell, the primaries are still hanging. Sigh... see here for the travis build.

@drallensmith
Copy link
Contributor Author

At least one question that needs solving is: What should the primary node do if there's an error that likely indicates the secondaries are out of contact? In particular, if they are out of contact (pipe for queue broken, socket closed, whatever) but are actually still running (exit_on_stop=False, for instance)? Would it work to re-get the queue in question from the manager?

@bennr01
Copy link
Contributor

bennr01 commented Jul 25, 2017

@drallensmith

What should the primary node do if there's an error that likely indicates the secondaries are out of contact? In particular, if they are out of contact (pipe for queue broken, socket closed, whatever) but are actually still running (exit_on_stop=False, for instance)?

I think the behavior of the primary node should depend on whether the error is recoverable or not. If the secondary nodes are offline, but the evaluation could continue if a new secondary connects to the primary node, then the error should probably simply ignored (Sometimes, some nodes of a cluster may reboot, especially if the cluster relocate processes according to the usage of the nodes).
If the error is critical (the evaluation can not continue), then the primary node should raise an exception (it cant continue the evaluation and a silent exit may cause confusion).
It would be nice if we could try to restart the evaluation from the last generation, but this may be hard to implement. Maybe i will try implementing this when i finally have some time.
I am not sure what you mean with the exit_on_stop=False argument. exit_on_stop controls the call to sys.exit(0) on the secondary nodes. The intended behavior for exit_on_stop=False is that the call to DistributedEvaluator.start() returns instead of calling sys.exit(0). Personally, i don't really see a reason that anyone would want to set exit_on_stop to False, but as a call to DistributedEvaluator.start() is required for the distributed evaluation, it would be weird if it would not be possible to disable the call to sys.exit(0).

Would it work to re-get the queue in question from the manager?

It seems like a re-getting the queue from the manager would create a new connection, so the new queue would be connected again. However, any items consumed from the first queue but not processed may result in an frozen primary node (if i recall correctly, the primary node keeps track of the number of genomes it still has to wait for, so a missing item would result in an infinite loop).

I just had an idea about the problem with the stopevent. Your idea for using a queue for checking if the secondary nodes should stop is fine, but why do we actually get() the stopevent from the queue? Wouldn't it be enough for every node to request the eventqueue once and check if it is empty as the condition for executing the loops? In this case, we could just keep the eventqueue empty and put an element into it when stop() is called.

Well, I'm no longer seeing AuthenticationErrors from the secondaries - but, as far as I can tell, the primaries are still hanging. Sigh... see here for the travis build.

Have you checked if these errors also appear outside of travis?
If this continues without visible cause, I'll try to implement the idea with a wrapper around the manager.

@drallensmith
Copy link
Contributor Author

drallensmith commented Jul 25, 2017

Regarding being able to restart the evaluation, or at least the population - that's pretty much exactly what I was testing for with checkpointing; it does work with non-distributed NEAT currently, and I suspect that anyone wanting to do (serious) distributed computing is going to have a big enough task at hand that the loss due to not checkpointing will be much more unacceptable.

If the primary detects that it needs to reinitialize the connections to the secondaries, then it should probably put any genomes that it sent out and didn't get back fitnesses for in a pool that, once other genomes are done, is sent out again if it hasn't received fitnesses for them by then. Alternatively or as well, if the secondaries keep track of what fitness information they've gathered so far and don't automatically completely shut down or otherwise become unavailable, some mechanism to inquire for this (or even simply having them send back a pickled dict of genome id vs fitness as the first thing in the queue going back to the primary node, once a connection was re-established; isinstance or similar should be usable to tell this apart from the usual secondary responses).

In regard to why not always stop the secondary process, several thoughts:

  • Startup time for the evaluation function. Depending on the evaluation function, setting it up may require significant time (loading in large files, doing preliminary calculations, or whatever). It may well be helpful to keep around the secondaries to try another run with adjusted, say, mutation rates. Another example - pypy should work rather faster if it's able to keep going for a while; for short runs, it's frequently slower than cpython.
  • Startup difficulty: In the current setup, as is probably needed to keep it independent of the local configuration, the distributed.py code has no way to restart the secondaries, for instance - it's dependent on the calling code for that, and it would be rather more convenient for the calling code if it didn't have to restart the run again and again, potentially from the start, because the secondaries exited.
  • If the connection between the secondary and the primary is lost, the original code would have the secondary in a loop; the current code at least sometimes detects this and exits the loop, but the question is how to get the secondary reconnected - something that isn't possible exiting the loop equals exiting the process.
  • If the secondary is still around, and has kept a cache of genome id vs fitness results, then if the primary sends it the genome in question again, it can just respond with the fitness.

The stopevent idea should work - good thought! The queues are what's currently working the best, as far as I can tell... urr... but that's with queues that are continuously going. It may be hard for it to tell the difference between an empty queue and a broken queue. Hmm... currently, one difficulty is that the primary doesn't actually know how many secondaries it is supposed to have available. If it had that information, it could just stick a series of "stop" tokens on the queue going to the secondaries and wait to receive an equal number of acknowledging "I'm stopping" tokens on the return queue. [Admittedly, some of the secondaries but not all of them going down could be a difficulty with this... hmm... I do note that the multiprocessing queue implementation, unlike the queue.Queue one, has the capability of a "close" command. Perhaps the primary could simply keep sending stop tokens until all the secondaries that were connected had closed their connections?]

The errors (as in hangs) with the test_xor_example_distributed.py test script definitely happen outside of travis. I don't know about the hangs in test_distributed.py itself, which are only happening in Python 3.6+; locally I have set up 2.7.13, 3.5.3, pypy2-5.8.0, and pypy3-5.8.0 (although I usually only test locally with 2.7.13 and 3.5.3). I'm a bit low on disk & other space, but can try to set up 3.6.X in addition locally... but even if that part's a travis bug interacting with 3.6+ python, that still leaves the test_xor_example_distributed.py failures (in restarting after a checkpoint). Being able to restart after a checkpoint also tests the resilience of the connection/reconnection code about as well as could be done without using multiple machines (as in the best we can do with readily-available automatable testing, as far as I can tell).

Actual testing with more than one machine is also needed, however, to make sure the underlying code isn't taking shortcuts enabled by being on the same machine (e.g., queue.Queue is reliant on this, as far as I can tell).

@bennr01
Copy link
Contributor

bennr01 commented Jul 26, 2017

@drallensmith i have pushed an updated version of 'distributed.py' (based on your changes in 'config_work') to the 'distributed_rewrite' branch in my fork. It is still not finished, but here are a few of the changes:

  • new stop mechanic
  • The manager is now contained in a new class
  • the DistributedEvaluator.start() method now has a reconnect argument, which causes the secondary nodes to reconnect if the connection is lost (untested, defaults to False)
  • The mode determination is now a separate function
    However, the distributed+pickle test still fails (a gaierror is raised when the secondary nodes try to connect again) and some other work still needs to be done.

Regarding being able to restart the evaluation, or at least the population - that's pretty much exactly what I was testing for with checkpointing; it does work with non-distributed NEAT currently, and I suspect that anyone wanting to do (serious) distributed computing is going to have a big enough task at hand that the loss due to not checkpointing will be much more unacceptable.

Good point.

@drallensmith
Copy link
Contributor Author

Definite progress; the EOFErrors (and similar) need some work, but still definite progress. I wasn't getting anywhere but frustrated, so - after working on some non-online matters - decided to work on some other aspects for a bit; I've used mypy to help me check my multiparameter function code + the config_work documentation, and have indeed found some places needing changes.

@drallensmith
Copy link
Contributor Author

I think I may have managed to make distributed.py a bit more reliable and faster to test. I was working on other things and being hindered by how long the various distributed.py tests were taking (and by the intermittent failures), so investigated further using -vd and -vsd flags for nosetests (plus some direct runs of the test code). It seems that there are frequent failures of 1+ threads in test_distributed.py, which unfortunately don't get reported by travis, etc due to the lack of error status feedback. I have attached a file with some examples of these and others (including of tests that would have errored on travis due to the time they were taking).

I noticed that in all cases I was seeing, the common problem was a built-in timeout in connecting for updating the state - not in connecting for a queue. One can control the maximum time spent on trying to retrieve from (or put into) a queue, and keep trying again until it works; there is no such timeout control for various other proxy objects. I therefore tried out a combination of the "put a stop token on the queue" idea from above with some further checks on how long things are taking, most particularly how long it's been since the secondary processes/threads have successfully gotten a job or transmitted the results back.

@bennr01, could you and @D0pa please take a look at the code? It is in my fork's master branch. Quite a bit of it is unfortunately heuristic guesses as to good timeouts and other variables, albeit with the possibility of specifying some of them by the caller; it is also rather complex code (McCabe cyclomatic complexity score of 33...). I am certain a lot of improvements are possible - for instance: sending some sort of "keepalive" token to the secondaries if it is desired to use them again; having an option for the user to tell the primary process/thread how many secondaries there are, and otherwise having more control over how many times the primary sends out a stop token; the idea I mentioned above regarding caching results in case of queue data loss; the secondaries, if they were told at the start of each generation + have some idea of how many genomes they were expected to process, taking this into account in deciding whether there had been too long since a contact. It also badly needs testing in an actual multi-machine situation. I am also going to investigate if pypy/pypy3 can be gotten to work on the non-threading versions - I think I'll fork off a separate branch for that testing.

@bennr01
Copy link
Contributor

bennr01 commented Aug 5, 2017

It seems that there are frequent failures of 1+ threads in test_distributed.py, which unfortunately don't get reported by travis, etc due to the lack of error status feedback.

Interesting... Thanks for investigating this.

I noticed that in all cases I was seeing, the common problem was a built-in timeout in connecting for updating the state - not in connecting for a queue.

It seems to me that we just handle the errors better for the queues than we do for the secondary_state. I checked the last version from my distributed_rewrite branch. When accessing the queue, we check for TimeoutErrors, but i forgot to add this check for getting the state.

One can control the maximum time spent on trying to retrieve from (or put into) a queue, and keep trying again until it works; there is no such timeout control for various other proxy objects.

I thought that the queue timeout would behave like the normal timeout for threading.Queues (raising an exception when there was no item to get from the queue in the specified time) and the timeout we experience would be a socket timeout, which is unrelated to the queue timeout. Am i wrong?

I am certain a lot of improvements are possible - for instance: sending some sort of "keepalive" token to the secondaries if it is desired to use them again;

The intended behavior for the reconnection and re-usability is to always reconnect if reconnect==True unless explicitly stopped with force_secondary_shutdown==True. If we send keepalive tokens if we desire to use them again, how can ensure they still reconnect if the primary node fails or is terminated?

Sigh... Maybe we should stop using multiprocessing and rewrite distributed.py to use sockets directly instead. What do you think?

BTW, in distributed.py in your master branch, you defined a method called _check_exception, which returns either 0, 1 or -1. You should probably avoid using integers directly and use a constant with a meaningful name instead. When checking the return value of _check_exception, avoid using the < and > operators and prefer == or in () instead. This makes the code much more readable.

@drallensmith
Copy link
Contributor Author

drallensmith commented Aug 5, 2017

Quite welcome. I may have gotten pypy to work for non-threading tests.

I am suspecting, but I definitely don't know for sure, that the queue timeout can, at least sometimes, work for more than queue empty because it's a timeout on recv or sndv, as appropriate. Admittedly, that would not explain doing better for connect, but that may also be a matter of proxied booleans reconnecting (as in redoing connect) each time to check, while the queue connections are kept open, being anticipated to be continuously changing.

I have not yet succeeded in, if the primary manager shuts down, getting the secondaries to reconnect. I'm currently trying out essentially using whether _reset_em() is successful as a test - that doesn't require that there is anything in the queues, but from observation of some errors does require connect to work.

Good question re sockets; I wouldn't do that quite yet - socket apparently varies even more by OS than does multiprocessing, for one thing.

Thank you for pointing that out on _check_exception - good point. I'll correct that shortly.

@drallensmith
Copy link
Contributor Author

Sigh... looks like pypy is still erratic. I may try to set up a .travis.yml file such that pypy + pypy3 are used twice - once without distributed tests, once for just those but allowed to fail - in order to get more data on what's happening.

@drallensmith
Copy link
Contributor Author

@bennr01: I wanted to let you (and others) know that I tried making the queue.Queues multiprocessing.Queues instead, with close being called at appropriate points, and apparently managed to find an internal Python bug, causing mysterious AssertionErrors in the multiprocessing module. (At the minimum, it's a bug that it's not more informative than "AssertionError"...) See the multiprocessing_internal_error branch in my fork. I've put in a bug report at bugs.python.org.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants