diff --git a/Makefile.am b/Makefile.am index 63d5c9e6..4a66abeb 100644 --- a/Makefile.am +++ b/Makefile.am @@ -361,6 +361,10 @@ noinst_PROGRAMS= \ mordor/examples/echoserver \ mordor/examples/iombench \ mordor/examples/simpleappserver \ + mordor/examples/simplehttpfileserver \ + mordor/examples/simplehttpfileuploadserver \ + mordor/examples/simplehttpserver \ + mordor/examples/sslterminator \ mordor/examples/tunnel \ mordor/examples/udpstats @@ -400,6 +404,34 @@ mordor_examples_simpleappserver_LDADD=mordor/libmordor.la \ $(SECURITY_FRAMEWORK_LIBS) \ $(SYSTEMCONFIGURATION_FRAMEWORK_LIBS) +mordor_examples_simplehttpfileserver_SOURCES=mordor/examples/simplehttpfileserver.cpp +mordor_examples_simplehttpfileserver_LDADD=mordor/libmordor.la \ + $(CORESERVICES_FRAMEWORK_LIBS) \ + $(COREFOUNDATION_FRAMEWORK_LIBS) \ + $(SECURITY_FRAMEWORK_LIBS) \ + $(SYSTEMCONFIGURATION_FRAMEWORK_LIBS) + +mordor_examples_simplehttpfileuploadserver_SOURCES=mordor/examples/simplehttpfileuploadserver.cpp +mordor_examples_simplehttpfileuploadserver_LDADD=mordor/libmordor.la \ + $(CORESERVICES_FRAMEWORK_LIBS) \ + $(COREFOUNDATION_FRAMEWORK_LIBS) \ + $(SECURITY_FRAMEWORK_LIBS) \ + $(SYSTEMCONFIGURATION_FRAMEWORK_LIBS) + +mordor_examples_sslterminator_SOURCES=mordor/examples/sslterminator.cpp +mordor_examples_sslterminator_LDADD=mordor/libmordor.la \ + $(CORESERVICES_FRAMEWORK_LIBS) \ + $(COREFOUNDATION_FRAMEWORK_LIBS) \ + $(SECURITY_FRAMEWORK_LIBS) \ + $(SYSTEMCONFIGURATION_FRAMEWORK_LIBS) + +mordor_examples_simplehttpserver_SOURCES=mordor/examples/simplehttpserver.cpp +mordor_examples_simplehttpserver_LDADD=mordor/libmordor.la \ + $(CORESERVICES_FRAMEWORK_LIBS) \ + $(COREFOUNDATION_FRAMEWORK_LIBS) \ + $(SECURITY_FRAMEWORK_LIBS) \ + $(SYSTEMCONFIGURATION_FRAMEWORK_LIBS) + mordor_examples_tunnel_SOURCES=mordor/examples/tunnel.cpp mordor_examples_tunnel_LDADD=mordor/libmordor.la \ $(CORESERVICES_FRAMEWORK_LIBS) \ diff --git a/README b/README index 8a321ecb..e697f697 100644 --- a/README +++ b/README @@ -1,175 +1 @@ -Mordor - -What is it? - -Mordor is a high performance I/O library. It is cross-platform, compiling -on Windows, Linux, and Mac (32-bit and 64-bit on all platforms). It includes -several main areas: - -* Cooperatively scheduled fiber engine, including synchronization primitives -* Streams library, for dealing with streams of data and manipulating them. -* HTTP library, building on top of Fibers and Streams, to provide a simple to - use, yet extremely powerful HTTP client and server API. -* Supporting infrastructure, including logging, configuration, statistics - gathering, and exceptions. -* A unit test framework that is lightweight, easy to use, but has several useful - features. - -One of the main goals of Mordor is to provide very easy to use abstractions and -encapsulation of difficult and complex concepts, yet still provide near absolute -power in weilding them if necessary. - -Where should it be used? - -Any software (server-side or client-side) that need to process a lot of data. -It is C++, so is probably overkill for something that could be easily handled -with a Python or Ruby script, but can be used for simpler tasks because it does -provide some nice abstractions that you won't see elsewhere. Server -applications handling lots of connections will benefit most from the Fiber -engine, by transforming an event-based paradigm into a familiar thread-based -paradigm, while keeping (and in some cases improving) the performance of an -event-based paradigm. - -How does it change the game? - -Mordor allows you to focus on performing a logical task, instead of deciding how -to make that task conform to a specific threading/event model. Just because -local disk I/O will block, and should be performed in a thread pool, and network -I/O should be performed using an event based callback design, doesn't mean you -can't do them both _in the same function_. Mordor allows you to do just that. -For example, here's a complete program to read a file from disk, and send it to -a socket on the network: - -#include - -#include -#include -#include -#include - -using namespace Mordor; - -int main(int argc, char **argv) -{ - if (argc != 3) { - std::cerr << "usage: " << argv[0] << " " << std::endl; - return 1; - } - try { - std::vector addresses = Address::lookup(argv[2], AF_UNSPEC, SOCK_STREAM); - Socket::ptr socket = addresses[0]->createSocket(); - socket->connect(addresses[0]); - Stream::ptr fileStream(new FileStream(argv[1], FileStream::OPEN, FileStream::READ)); - Stream::ptr socketStream(new SocketStream(socket)); - transferStream(fileStream, socketStream); - } catch (...) { - std::cerr << boost::current_exception_diagnostic_information() << std::endl; - return 2; - } - return 0; -} - -This program is quite simple. It checks for usage, translates the string -argument into a network address, creates a socket that is compatible with that -address, connects to it, opens a file (as a stream), wraps the socket in a -stream, and then sends the file over the socket. If an error occurs, complete -error information is printed on stdout, including the type of error, the OS -level error code and description (if applicable), and a complete stacktrace of -the error, including debug symbol information, if available. Looking at it, we -can see that there is only a single thread. Which is all fine and dandy if -this is all we're doing. But what if instead we were sending 1000 files to -1000 different sockets, but didn't want to create a thread for each one? Let's -say we want one thread for communicating with the network, and four threads for -reading the file off the disk. Let's do it! - -#include - -#include -#include -#include -#include -#include -#include - -using namespace Mordor; - -static void doOne(const char *file, const char *destination, IOManager &ioManager, Scheduler &scheduler) -{ - try { - std::vector addresses = Address::lookup(destination, AF_UNSPEC, SOCK_STREAM); - Socket::ptr socket = addresses[0]->createSocket(ioManager); - socket->connect(addresses[0]); - Stream::ptr fileStream(new FileStream(file, FileStream::READ, FileStream::OPEN, &ioManager, &scheduler)); - Stream::ptr socketStream(new SocketStream(socket)); - transferStream(fileStream, socketStream); - } catch (...) { - std::cerr << boost::current_exception_diagnostic_information() << std::endl; - } -} - -int main(int argc, char **argv) -{ - if (argc % 2 != 1) { - std::cerr << "usage: " << argv[0] << " ( )*" << std::endl; - return 1; - } - IOManager ioManager; - WorkerPool workerPool(4, false); - for (int i = 1; i < argc; i += 2) - ioManager.schedule(boost::bind(&doOne, argv[i], argv[i + 1], boost::ref(ioManager), boost::ref(workerPool))); - ioManager.dispatch(); - - return 0; -} - -So we re-factored most of main into doOne, but other than that it is nearly -identical. And it will transfer as many files as you pass on the command line -in parallel. Using 5 threads. The IOManager is the object used for -non-blocking network I/O, and so is passed to the Socket when it is created. -WorkerPool is just a generic thread pool, and is passed to the FileStream so -that it will automatically do its work on those threads, instead of the thread -it is running on when it is called. Something to point out here is that when -the work is scheduled on the IOManager, each bit of work implicitly creates a -Fiber - a lightweight, cooperatively scheduled, user-mode thread. The doOne -function is executed on its own Fiber, and is allowed to switch which thread it -is running on (inside of FileStream), without having to do any callbacks, -virtual functions on a defined interface, or anything else. Internally, when -FileStream wants to execute on the thread pool, it suspends the current Fiber, -allowing other Fibers to run on this thread, and is resumed on a thread in the -WorkerPool. IOManager and WorkerPool both inherit from Scheduler, which is the -base functionality for cooperatively scheduling Fibers. Pretty cool, eh? - - -Dependencies - -boost 1.40 -OpenSSL -Zlib -Ragel (compile-time only) - - -Compiling for iPhone SDK - -The iPhone SDK does not include OpenSSL headers or binaries. Since mordor relies -on OpenSSL, you must provide these files yourself. The Xcode project file is -configured to look for files in an iphone directory in the same directory as the -project file. - -Specifically, the compiler will look for headers in iphone/include/ and -libraries in iphone/lib/. We recommend you create a symbolic link called -"iphone" which points to the actual directory containing the include/ and lib/ -directories. - - -License - -Mordor is licensed under the New BSD License, and Copyright (c) 2009, Decho Corp. -See LICENSE for details. - -Authors - -Cody Cutrer (cody@mozy.com) -Patrick Bozeman (peb@mozy.com) -Jeremy Stanley (jeremy@mozy.com) -Zach Wily (zach@mozy.com) -Brian Palmer (brian@mozy.com) +look@README.md diff --git a/README.md b/README.md new file mode 100644 index 00000000..7cb710f5 --- /dev/null +++ b/README.md @@ -0,0 +1,272 @@ +# Mordor + +## What is it? + +Mordor is a high performance I/O library. It is cross-platform, compiling +on Windows, Linux, and Mac (32-bit and 64-bit on all platforms). It includes +several main areas: + +* Cooperatively scheduled fiber engine, including synchronization primitives +* Streams library, for dealing with streams of data and manipulating them. +* HTTP library, building on top of Fibers and Streams, to provide a simple to + use, yet extremely powerful HTTP client and server API including SSL/TLS. +* Supporting infrastructure, including logging, configuration, statistics + gathering, and exceptions. +* A unit test framework that is lightweight, easy to use, but has several useful + features. + +One of the main goals of Mordor is to provide very easy to use abstractions and +encapsulation of difficult and complex concepts, yet still provide near absolute +power in weilding them if necessary. + +## Where should it be used? + +Any software (server-side or client-side) that need to process a lot of data. +It is C++, so is probably overkill for something that could be easily handled +with a Python or Ruby script, but can be used for simpler tasks because it does +provide some nice abstractions that you won't see elsewhere. Server +applications handling lots of connections will benefit most from the Fiber +engine, by transforming an event-based paradigm into a familiar thread-based +paradigm, while keeping (and in some cases improving) the performance of an +event-based paradigm. + +## How does it change the game? + +Mordor allows you to focus on performing a logical task, instead of deciding how +to make that task conform to a specific threading/event model. Just because +local disk I/O will block, and should be performed in a thread pool, and network +I/O should be performed using an event based callback design, doesn't mean you +can't do them both _in the same function_. Mordor allows you to do just that. +For example, here's a complete program for simple http file server (mordor/examples/simplehttpfileserver.cpp): + + + #include "mordor/predef.h" + + #include + + #include + + #include "mordor/config.h" + #include "mordor/http/server.h" + #include "mordor/iomanager.h" + #include "mordor/main.h" + #include "mordor/socket.h" + #include "mordor/streams/file.h" + #include "mordor/streams/socket.h" + #include "mordor/streams/transfer.h" + #include "mordor/streams/ssl.h" + + using namespace Mordor; + + static void httpRequest(HTTP::ServerRequest::ptr request) + { + const std::string &method = request->request().requestLine.method; + const URI &uri = request->request().requestLine.uri; + + if (method == HTTP::GET) { + FileStream::ptr stream(new FileStream(uri.path.toString(), FileStream::READ, FileStream::OPEN, static_cast(Scheduler::getThis()), Scheduler::getThis())); + HTTP::respondStream(request, stream); + } else { + HTTP::respondError(request, HTTP::METHOD_NOT_ALLOWED); + } + } + + void serve(Socket::ptr listen, bool ssl) + { + IOManager ioManager; + SSL_CTX* ssl_ctx = NULL; + + if (ssl) + ssl_ctx = SSLStream::createSSLCTX(); + + while (true) + { + Socket::ptr socket = listen->accept(&ioManager); + SocketStream::ptr socketStream(new SocketStream(socket)); + + Stream::ptr stream; + + if (ssl) + { + SSLStream::ptr sslStream(new SSLStream(socketStream, false, true, ssl_ctx)); + sslStream->accept(); + stream = sslStream; + } + else + { + stream = socketStream; + } + + HTTP::ServerConnection::ptr conn(new HTTP::ServerConnection(stream, &httpRequest)); + Scheduler::getThis()->schedule(boost::bind(&HTTP::ServerConnection::processRequests, conn)); + } + } + + MORDOR_MAIN(int argc, char *argv[]) + { + try { + Config::loadFromEnvironment(); + IOManager ioManager; + + Socket::ptr httpSocket = Socket::ptr(new Socket(ioManager, AF_INET, SOCK_STREAM)); + IPv4Address httpAddress(INADDR_ANY, 8080); + httpSocket->bind(httpAddress); + httpSocket->listen(); + + Socket::ptr httpsSocket = Socket::ptr(new Socket(ioManager, AF_INET, SOCK_STREAM)); + IPv4Address httpsAddress(INADDR_ANY, 8443); + httpsSocket->bind(httpsAddress); + httpsSocket->listen(); + + boost::thread serveThread1(serve, httpSocket, false); + boost::thread serveThread2(serve, httpSocket, false); + boost::thread serveThread3(serve, httpsSocket, true); + boost::thread serveThread4(serve, httpsSocket, true); + serveThread1.join(); + serveThread2.join(); + serveThread3.join(); + serveThread4.join(); + + } catch (...) { + std::cerr << boost::current_exception_diagnostic_information() << std::endl; + } + return 0; + } + +The IOManager is the object used for non-blocking network I/O, and so is passed to the Socket when it is created. The FileStream need a scheduler object to schedule the non-blocking file I/O, so in that case it use the scheduler of network I/O, so the scheduling of incoming connections/processing and read from file system play in the same scheduler without any callbacks! +Something to point out here is that when the work is scheduled on the IOManager, each bit of work implicitly creates a Fiber - a lightweight, cooperatively scheduled, user-mode thread. +In that example we have 2 native threads handlers for each HTTP/HTTPS. + + +## Dependencies + +* boost 1.49 +* OpenSSL +* Zlib +* Ragel (compile-time only) +* tcmalloc (optional) + +## Building + + ~/mordor/buildtools/build.sh + +for debug: + + ~/mordor/buildtools/build.sh debug + +## Benchmark + +So I follow the benchmark from Monkey website (Benchmark: Monkey v/s GWan in a Linux 64 bit platform) +http://monkey-project.com/benchmarks/x86_64_monkey_gwan + +and perform the benchmark myself on file with sizeof 1312 bytes. + +I run the test 3 times for each server and choose the best one. + +## Monkey vs GWan vs original Mordor vs cmpxchg16/Mordor + +Environment: + +Intel board, some details: + +* Kernel : 3.5.0 - x86_64 (Ubuntu 12.10) +* CPU : Intel(R) Core(TM) i7-3720QM CPU @ 2.60GHz (4 cores) +* RAM : 4 GB +* Filesystem: ext4 on a SSD + +### GWan: + + ** SIEGE 2.70 + ** Preparing 500 concurrent users for battle. + The server is now under siege... + Lifting the server siege... done. + Transactions: 66914 hits + Availability: 100.00 % + Elapsed time: 9.68 secs + Data transferred: 31.46 MB + Response time: 0.07 secs + Transaction rate: 6912.60 trans/sec + Throughput: 3.25 MB/sec + Concurrency: 490.04 + Successful transactions: 66914 + Failed transactions: 0 + Longest transaction: 1.00 + Shortest transaction: 0.00 + +### Monkey: + + ** SIEGE 2.70 + ** Preparing 500 concurrent users for battle. + The server is now under siege... + Lifting the server siege... done. + Transactions: 73447 hits + Availability: 100.00 % + Elapsed time: 9.46 secs + Data transferred: 95.54 MB + Response time: 0.06 secs + Transaction rate: 7763.95 trans/sec + Throughput: 10.10 MB/sec + Concurrency: 490.98 + Successful transactions: 73448 + Failed transactions: 0 + Longest transaction: 0.77 + Shortest transaction: 0.00 + +### Original Mordor: + + ** SIEGE 2.70 + ** Preparing 500 concurrent users for battle. + The server is now under siege... + Lifting the server siege... done. + Transactions: 20882 hits + Availability: 100.00 % + Elapsed time: 9.06 secs + Data transferred: 26.13 MB + Response time: 0.18 secs + Transaction rate: 2304.86 trans/sec + Throughput: 2.88 MB/sec + Concurrency: 410.50 + Successful transactions: 20882 + Failed transactions: 0 + Longest transaction: 7.39 + Shortest transaction: 0.01 + +### cmpxchg16/Mordor: + + ** SIEGE 2.70 + ** Preparing 500 concurrent users for battle. + The server is now under siege... + Lifting the server siege... done. + Transactions: 67896 hits + Availability: 100.00 % + Elapsed time: 9.62 secs + Data transferred: 88.32 MB + Response time: 0.07 secs + Transaction rate: 7057.80 trans/sec + Throughput: 9.18 MB/sec + Concurrency: 488.34 + Successful transactions: 67896 + Failed transactions: 0 + Longest transaction: 1.42 + Shortest transaction: 0.00 + + +* cmpxchg16/Mordor 3x than original Mordor +* cmpxchg16/Mordor slightly better from GWan +* Monkey slightly better + +**But! don't forget that Mordor without the callbacks hell! you write synchronous network/file I/O, and under the hood it's asynchronous.** + +## Important Notes: + +* To gain more performance, the locks in scheduling executions was eliminated, the core was changed so Scheduler can run only on one native thread and doesn't create native threads, yet not all the examples was migrated to that model, so it could be that some examples will not work properly due to assertion that validate the model (migrated examples::simplefileserver, simpleappserver, echoserver) +* The core include stacks pool, so we reuse stacks and eliminate system calls for new/delete stack, the size of the pool can be configured and it's should be fine tune because its affect on performance + +## License + +Mordor is licensed under the New BSD License +See LICENSE for details. + +## Authors + +Uri Shamay (shamayuri@gmail.com) diff --git a/buildtools/build.sh b/buildtools/build.sh index fa71a5a8..6f60c2e3 100755 --- a/buildtools/build.sh +++ b/buildtools/build.sh @@ -5,6 +5,7 @@ if [ ! -x configure ]; then fi CXXFLAGS= +TCMALLOC= if [ `uname` = 'Darwin' ] ; then CXXFLAGS="-Wno-deprecated-declarations" fi @@ -19,10 +20,12 @@ if [ ! -f Makefile ]; then VALGRINDFLAGS=--enable-valgrind=yes else ASSERTFLAGS=--disable-assert + CXXFLAGS+=' -O3' + TCMALLOC=--with-tcmalloc=yes fi export CXXFLAGS which pg_config >/dev/null || POSTGRESFLAGS=--without-postgresql - ./configure --disable-shared $ASSERTFLAGS $POSTGRESFLAGS $VALGRINDFLAGS + ./configure --disable-shared $ASSERTFLAGS $POSTGRESFLAGS $VALGRINDFLAGS $TCMALLOC fi if [ $(uname) = 'Linux' ]; then j=$(grep processor /proc/cpuinfo | wc -l) diff --git a/configure.ac b/configure.ac index c8335970..028b765b 100644 --- a/configure.ac +++ b/configure.ac @@ -40,6 +40,19 @@ AX_CHECK_SECURITY_FRAMEWORK AX_CHECK_SYSTEMCONFIGURATION_FRAMEWORK AX_LIB_POSTGRESQL AM_CONDITIONAL([HAVE_POSTGRESQL], [test "x${POSTGRESQL_VERSION}" != x]) + + +AC_ARG_WITH(tcmalloc, + [AC_HELP_STRING([--with-tcmalloc], [(optional) link with tcmalloc; default is no])], + [with_tcmalloc=$withval], + [with_tcmalloc=no] + ) + +if test "x$with_tcmalloc" != 'xno' +then + AC_CHECK_LIB([tcmalloc_minimal], [malloc], [], [AC_MSG_ERROR([Cannot find tcmalloc])]) +fi + AC_ARG_WITH(yaml, [AS_HELP_STRING([--with-yaml=[[ARG]]], [Enable YAML parsing support @<:@default=check@:>@])], diff --git a/mordor/examples/echoserver.cpp b/mordor/examples/echoserver.cpp index d54ff1ec..504e6433 100644 --- a/mordor/examples/echoserver.cpp +++ b/mordor/examples/echoserver.cpp @@ -3,22 +3,21 @@ #include "mordor/predef.h" #include +#include #include "mordor/config.h" #include "mordor/daemon.h" -#include "mordor/http/multipart.h" -#include "mordor/http/server.h" #include "mordor/iomanager.h" #include "mordor/main.h" #include "mordor/socket.h" -#ifdef WINDOWS -#include "mordor/streams/namedpipe.h" -#endif #include "mordor/streams/socket.h" #include "mordor/streams/transfer.h" +#include "mordor/streams/ssl.h" + using namespace Mordor; + void streamConnection(Stream::ptr stream) { try { @@ -30,10 +29,10 @@ void streamConnection(Stream::ptr stream) void socketServer(Socket::ptr listen) { - listen->listen(); + IOManager ioManager; while (true) { - Socket::ptr socket = listen->accept(); + Socket::ptr socket = listen->accept(&ioManager); Stream::ptr stream(new SocketStream(socket)); Scheduler::getThis()->schedule(boost::bind(&streamConnection, stream)); } @@ -51,110 +50,35 @@ void startSocketServer(IOManager &ioManager) Scheduler::getThis()->schedule(boost::bind(&socketServer, s)); } -#ifndef WINDOWS UnixAddress echoaddress("/tmp/echo"); Socket::ptr s = echoaddress.createSocket(ioManager, SOCK_STREAM); s->bind(echoaddress); Scheduler::getThis()->schedule(boost::bind(&socketServer, s)); -#endif } -void httpRequest(HTTP::ServerRequest::ptr request) +int main(int argc, char *argv[]) { - const std::string &method = request->request().requestLine.method; - if (method == HTTP::GET || method == HTTP::HEAD || method == HTTP::PUT || - method == HTTP::POST) { - request->response().entity.contentLength = request->request().entity.contentLength; - request->response().entity.contentType = request->request().entity.contentType; - request->response().general.transferEncoding = request->request().general.transferEncoding; - request->response().status.status = HTTP::OK; - request->response().entity.extension = request->request().entity.extension; - if (request->hasRequestBody()) { - if (request->request().requestLine.method != HTTP::HEAD) { - if (request->request().entity.contentType.type == "multipart") { - Multipart::ptr requestMultipart = request->requestMultipart(); - Multipart::ptr responseMultipart = request->responseMultipart(); - for (BodyPart::ptr requestPart = requestMultipart->nextPart(); - requestPart; - requestPart = requestMultipart->nextPart()) { - BodyPart::ptr responsePart = responseMultipart->nextPart(); - responsePart->headers() = requestPart->headers(); - transferStream(requestPart->stream(), responsePart->stream()); - responsePart->stream()->close(); - } - responseMultipart->finish(); - } else { - respondStream(request, request->requestStream()); - return; - } - } else { - request->finish(); - } - } else { - request->response().entity.contentLength = 0; - request->finish(); - } - } else { - respondError(request, HTTP::METHOD_NOT_ALLOWED); - } -} + try { + IOManager ioManager; -void httpServer(Socket::ptr listen) -{ - listen->listen(); + std::vector addresses = Address::lookup("localhost:8080"); + Socket::ptr socket = addresses[0]->createSocket(ioManager, SOCK_STREAM); + socket->bind(addresses[0]); + socket->listen(); - while (true) { - Socket::ptr socket = listen->accept(); - Stream::ptr stream(new SocketStream(socket)); - HTTP::ServerConnection::ptr conn(new HTTP::ServerConnection(stream, &httpRequest)); - Scheduler::getThis()->schedule(boost::bind(&HTTP::ServerConnection::processRequests, conn)); - } -} + boost::thread serveThread1(socketServer, socket); + boost::thread serveThread2(socketServer, socket); + boost::thread serveThread3(socketServer, socket); + boost::thread serveThread4(socketServer, socket); -void startHttpServer(IOManager &ioManager) -{ - std::vector addresses = Address::lookup("localhost:80"); + serveThread1.join(); + serveThread2.join(); + serveThread3.join(); + serveThread4.join(); - for (std::vector::const_iterator it(addresses.begin()); - it != addresses.end(); - ++it) { - Socket::ptr s = (*it)->createSocket(ioManager, SOCK_STREAM); - s->bind(*it); - Scheduler::getThis()->schedule(boost::bind(&httpServer, s)); - } -} - -#ifdef WINDOWS -void namedPipeServer(IOManager &ioManager) -{ - while (true) { - NamedPipeStream::ptr stream(new NamedPipeStream("\\\\.\\pipe\\echo", NamedPipeStream::READWRITE, &ioManager)); - stream->accept(); - Scheduler::getThis()->schedule(boost::bind(&streamConnection, stream)); - } -} -#endif - -int run(int argc, char *argv[]) -{ - try { - IOManager ioManager; - startSocketServer(ioManager); - startHttpServer(ioManager); -#ifdef WINDOWS - ioManager.schedule(boost::bind(&namedPipeServer, boost::ref(ioManager))); - ioManager.schedule(boost::bind(&namedPipeServer, boost::ref(ioManager))); -#endif - ioManager.dispatch(); } catch (...) { std::cerr << boost::current_exception_diagnostic_information() << std::endl; return 1; } return 0; } - -MORDOR_MAIN(int argc, char *argv[]) -{ - Config::loadFromEnvironment(); - return Daemon::run(argc, argv, &run); -} diff --git a/mordor/examples/simpleappserver.cpp b/mordor/examples/simpleappserver.cpp index 463aae38..cdfcf6aa 100644 --- a/mordor/examples/simpleappserver.cpp +++ b/mordor/examples/simpleappserver.cpp @@ -4,6 +4,8 @@ #include +#include + #include "mordor/config.h" #include "mordor/http/server.h" #include "mordor/iomanager.h" @@ -41,24 +43,36 @@ static void httpRequest(HTTP::ServerRequest::ptr request) } } +void serve(Socket::ptr listen) +{ + IOManager ioManager; + + while (true) + { + Socket::ptr socket = listen->accept(&ioManager); + Stream::ptr stream(new SocketStream(socket)); + HTTP::ServerConnection::ptr conn(new HTTP::ServerConnection(stream, + &httpRequest)); + conn->processRequests(); + } +} + MORDOR_MAIN(int argc, char *argv[]) { try { Config::loadFromEnvironment(); IOManager ioManager; - Socket s(ioManager, AF_INET, SOCK_STREAM); + Socket::ptr socket = Socket::ptr(new Socket(ioManager, AF_INET, SOCK_STREAM)); IPv4Address address(INADDR_ANY, 80); - s.bind(address); - s.listen(); + socket->bind(address); + socket->listen(); + + boost::thread serveThread1(serve, socket); + boost::thread serveThread2(serve, socket); + serveThread1.join(); + serveThread2.join(); - while (true) { - Socket::ptr socket = s.accept(); - Stream::ptr stream(new SocketStream(socket)); - HTTP::ServerConnection::ptr conn(new HTTP::ServerConnection(stream, - &httpRequest)); - conn->processRequests(); - } } catch (...) { std::cerr << boost::current_exception_diagnostic_information() << std::endl; } diff --git a/mordor/examples/simplehttpfileserver.cpp b/mordor/examples/simplehttpfileserver.cpp new file mode 100644 index 00000000..08c8c84b --- /dev/null +++ b/mordor/examples/simplehttpfileserver.cpp @@ -0,0 +1,92 @@ +#include "mordor/predef.h" + +#include + +#include + +#include "mordor/config.h" +#include "mordor/http/server.h" +#include "mordor/iomanager.h" +#include "mordor/main.h" +#include "mordor/socket.h" +#include "mordor/streams/file.h" +#include "mordor/streams/socket.h" +#include "mordor/streams/transfer.h" +#include "mordor/streams/ssl.h" + +using namespace Mordor; + +static void httpRequest(HTTP::ServerRequest::ptr request) +{ + const std::string &method = request->request().requestLine.method; + const URI &uri = request->request().requestLine.uri; + + if (method == HTTP::GET) { + FileStream::ptr stream(new FileStream(uri.path.toString(), FileStream::READ, FileStream::OPEN, static_cast(Scheduler::getThis()), Scheduler::getThis())); + HTTP::respondStream(request, stream); + } else { + HTTP::respondError(request, HTTP::METHOD_NOT_ALLOWED); + } +} + +void serve(Socket::ptr listen, bool ssl) +{ + IOManager ioManager; + SSL_CTX* ssl_ctx = NULL; + + if (ssl) + ssl_ctx = SSLStream::createSSLCTX(); + + while (true) + { + Socket::ptr socket = listen->accept(&ioManager); + SocketStream::ptr socketStream(new SocketStream(socket)); + + Stream::ptr stream; + + if (ssl) + { + SSLStream::ptr sslStream(new SSLStream(socketStream, false, true, ssl_ctx)); + sslStream->accept(); + stream = sslStream; + } + else + { + stream = socketStream; + } + + HTTP::ServerConnection::ptr conn(new HTTP::ServerConnection(stream, &httpRequest)); + Scheduler::getThis()->schedule(boost::bind(&HTTP::ServerConnection::processRequests, conn)); + } +} + +MORDOR_MAIN(int argc, char *argv[]) +{ + try { + Config::loadFromEnvironment(); + IOManager ioManager; + + Socket::ptr httpSocket = Socket::ptr(new Socket(ioManager, AF_INET, SOCK_STREAM)); + IPv4Address httpAddress(INADDR_ANY, 8080); + httpSocket->bind(httpAddress); + httpSocket->listen(); + + Socket::ptr httpsSocket = Socket::ptr(new Socket(ioManager, AF_INET, SOCK_STREAM)); + IPv4Address httpsAddress(INADDR_ANY, 8443); + httpsSocket->bind(httpsAddress); + httpsSocket->listen(); + + boost::thread serveThread1(serve, httpSocket, false); + boost::thread serveThread2(serve, httpSocket, false); + boost::thread serveThread3(serve, httpsSocket, true); + boost::thread serveThread4(serve, httpsSocket, true); + serveThread1.join(); + serveThread2.join(); + serveThread3.join(); + serveThread4.join(); + + } catch (...) { + std::cerr << boost::current_exception_diagnostic_information() << std::endl; + } + return 0; +} diff --git a/mordor/examples/simplehttpfileuploadserver.cpp b/mordor/examples/simplehttpfileuploadserver.cpp new file mode 100644 index 00000000..6dbec243 --- /dev/null +++ b/mordor/examples/simplehttpfileuploadserver.cpp @@ -0,0 +1,99 @@ +#include "mordor/predef.h" + +#include + +#include +#include + +#include "mordor/config.h" +#include "mordor/http/server.h" +#include "mordor/iomanager.h" +#include "mordor/main.h" +#include "mordor/socket.h" +#include "mordor/streams/file.h" +#include "mordor/streams/socket.h" +#include "mordor/streams/transfer.h" +#include "mordor/streams/ssl.h" +#include "mordor/streams/null.h" + +using namespace Mordor; + + +static volatile int counter = 0; + +static void httpRequest(HTTP::ServerRequest::ptr request) +{ + try { + const std::string &method = request->request().requestLine.method; + if (method == HTTP::POST) { + request->response().entity.contentLength = 0; + request->response().status.status = HTTP::OK; + if (request->hasRequestBody()) { + FileStream::ptr fileStream(new FileStream("/tmp/" + boost::lexical_cast(__sync_fetch_and_add(&counter, 1)), FileStream::WRITE, FileStream::OPEN_OR_CREATE, static_cast(Scheduler::getThis()), Scheduler::getThis())); + transferStreamDirect(request->requestStream(), fileStream); + } + } else { + respondError(request, HTTP::METHOD_NOT_ALLOWED); + } + + } catch (...) { + std::cerr << boost::current_exception_diagnostic_information() << std::endl; + } +} + +void serve(Socket::ptr listen, bool ssl) +{ + IOManager ioManager; + SSL_CTX* ssl_ctx = NULL; + + if (ssl) + ssl_ctx = SSLStream::createSSLCTX(); + + while (true) + { + Socket::ptr socket = listen->accept(&ioManager); + SocketStream::ptr socketStream(new SocketStream(socket)); + + Stream::ptr stream; + + if (ssl) + { + SSLStream::ptr sslStream(new SSLStream(socketStream, false, true, ssl_ctx)); + sslStream->accept(); + stream = sslStream; + } + else + { + stream = socketStream; + } + + HTTP::ServerConnection::ptr conn(new HTTP::ServerConnection(stream, &httpRequest)); + Scheduler::getThis()->schedule(boost::bind(&HTTP::ServerConnection::processRequests, conn)); + } +} + +MORDOR_MAIN(int argc, char *argv[]) +{ + try { + Config::loadFromEnvironment(); + IOManager ioManager; + + Socket::ptr httpSocket = Socket::ptr(new Socket(ioManager, AF_INET, SOCK_STREAM)); + IPv4Address httpAddress(INADDR_ANY, 8080); + httpSocket->bind(httpAddress); + httpSocket->listen(); + + boost::thread serveThread1(serve, httpSocket, false); + boost::thread serveThread2(serve, httpSocket, false); + boost::thread serveThread3(serve, httpSocket, false); + boost::thread serveThread4(serve, httpSocket, false); + serveThread1.join(); + serveThread2.join(); + serveThread3.join(); + serveThread4.join(); + + } catch (...) { + std::cerr << boost::current_exception_diagnostic_information() << std::endl; + } + return 0; +} diff --git a/mordor/examples/simplehttpserver.cpp b/mordor/examples/simplehttpserver.cpp new file mode 100644 index 00000000..0055403a --- /dev/null +++ b/mordor/examples/simplehttpserver.cpp @@ -0,0 +1,97 @@ +#include "mordor/predef.h" + +#include +#include + +#include "mordor/config.h" +#include "mordor/daemon.h" +#include "mordor/http/multipart.h" +#include "mordor/http/server.h" +#include "mordor/iomanager.h" +#include "mordor/main.h" +#include "mordor/socket.h" +#include "mordor/streams/socket.h" +#include "mordor/streams/transfer.h" + +using namespace Mordor; + + +void httpRequest(HTTP::ServerRequest::ptr request) +{ + const std::string &method = request->request().requestLine.method; + if (method == HTTP::GET || method == HTTP::HEAD || method == HTTP::PUT || + method == HTTP::POST) { + request->response().entity.contentLength = request->request().entity.contentLength; + request->response().entity.contentType = request->request().entity.contentType; + request->response().general.transferEncoding = request->request().general.transferEncoding; + request->response().status.status = HTTP::OK; + request->response().entity.extension = request->request().entity.extension; + if (request->hasRequestBody()) { + if (request->request().requestLine.method != HTTP::HEAD) { + if (request->request().entity.contentType.type == "multipart") { + Multipart::ptr requestMultipart = request->requestMultipart(); + Multipart::ptr responseMultipart = request->responseMultipart(); + for (BodyPart::ptr requestPart = requestMultipart->nextPart(); + requestPart; + requestPart = requestMultipart->nextPart()) { + BodyPart::ptr responsePart = responseMultipart->nextPart(); + responsePart->headers() = requestPart->headers(); + transferStream(requestPart->stream(), responsePart->stream()); + responsePart->stream()->close(); + } + responseMultipart->finish(); + } else { + respondStream(request, request->requestStream()); + return; + } + } else { + request->finish(); + } + } else { + request->response().entity.contentLength = 0; + request->finish(); + } + } else { + respondError(request, HTTP::METHOD_NOT_ALLOWED); + } +} + +void httpServer(Socket::ptr listen) +{ + IOManager ioManager; + + while (true) + { + Socket::ptr socket = listen->accept(&ioManager); + Stream::ptr stream(new SocketStream(socket)); + HTTP::ServerConnection::ptr conn(new HTTP::ServerConnection(stream, &httpRequest)); + Scheduler::getThis()->schedule(boost::bind(&HTTP::ServerConnection::processRequests, conn)); + } +} + +int main(int argc, char *argv[]) +{ + try { + IOManager ioManager; + + std::vector addresses = Address::lookup("localhost:80"); + Socket::ptr socket = addresses[0]->createSocket(ioManager, SOCK_STREAM); + socket->bind(addresses[0]); + socket->listen(); + + boost::thread serveThread1(httpServer, socket); + boost::thread serveThread2(httpServer, socket); + boost::thread serveThread3(httpServer, socket); + boost::thread serveThread4(httpServer, socket); + + serveThread1.join(); + serveThread2.join(); + serveThread3.join(); + serveThread4.join(); + + } catch (...) { + std::cerr << boost::current_exception_diagnostic_information() << std::endl; + return 1; + } + return 0; +} diff --git a/mordor/examples/sslterminator.cpp b/mordor/examples/sslterminator.cpp new file mode 100755 index 00000000..b086d062 --- /dev/null +++ b/mordor/examples/sslterminator.cpp @@ -0,0 +1,101 @@ +#include "mordor/predef.h" + +#include +#include + +#include "mordor/config.h" +#include "mordor/daemon.h" +#include "mordor/iomanager.h" +#include "mordor/main.h" +#include "mordor/socket.h" +#include "mordor/streams/socket.h" +#include "mordor/streams/ssl.h" + +using namespace Mordor; + + +void sslTerminatorHandleIO(Stream::ptr from, Stream::ptr to) +{ + try + { + int size = 4096; + Buffer buffer; + buffer.reserve(size); + + while (true) + { + size_t len = from->read(buffer, size); + if (!len) + return; + to->write (buffer, len); + } + } catch (...) { + std::cerr << boost::current_exception_diagnostic_information() << std::endl; + } +} + +void sslTerminatorHandler(Stream::ptr client, Address::ptr address) +{ + Scheduler* scheduler = Scheduler::getThis(); + + IOManager* ioManager = static_cast(scheduler); + Socket::ptr socket = address->createSocket(*ioManager, SOCK_STREAM); + socket->connect(address); + + SocketStream::ptr socketStream(new SocketStream(socket)); + + scheduler->schedule(boost::bind(&sslTerminatorHandleIO, client, socketStream)); + scheduler->schedule(boost::bind(&sslTerminatorHandleIO, socketStream, client)); +} + +void serve(Socket::ptr listen, Address::ptr address) +{ + IOManager ioManager; + + SSL_CTX* ssl_ctx = SSLStream::createSSLCTX(); + + while (true) { + try + { + Socket::ptr socket = listen->accept(&ioManager); + SocketStream::ptr socketStream(new SocketStream(socket)); + + SSLStream::ptr sslStream(new SSLStream(socketStream, false, true, ssl_ctx)); + sslStream->accept(); + + Scheduler::getThis()->schedule(boost::bind(&sslTerminatorHandler, sslStream, address)); + } catch (...) { + //TODO::for now we ignore from any exception that occurred during connection handling + std::cerr << boost::current_exception_diagnostic_information() << std::endl; + } + } +} + +int main(int argc, char *argv[]) +{ + try { + IOManager ioManager; + + std::vector addresses = Address::lookup("localhost:8443"); + Socket::ptr socket = addresses[0]->createSocket(ioManager, SOCK_STREAM); + socket->bind(addresses[0]); + socket->listen(); + + std::vector backendAddresses = Address::lookup("localhost:8080"); + + boost::thread serveThread1(serve, socket, backendAddresses[0]); + boost::thread serveThread2(serve, socket, backendAddresses[0]); + boost::thread serveThread3(serve, socket, backendAddresses[0]); + boost::thread serveThread4(serve, socket, backendAddresses[0]); + + serveThread1.join(); + serveThread2.join(); + serveThread3.join(); + serveThread4.join(); + + } catch (...) { + std::cerr << boost::current_exception_diagnostic_information() << std::endl; + return 1; + } + return 0; +} diff --git a/mordor/fiber.cpp b/mordor/fiber.cpp index d80e76a7..26bb45ca 100644 --- a/mordor/fiber.cpp +++ b/mordor/fiber.cpp @@ -3,12 +3,14 @@ #include "fiber.h" #include +#include #include "assert.h" #include "config.h" #include "exception.h" #include "statistics.h" #include "version.h" +#include "scheduler.h" #ifdef WINDOWS #include @@ -406,9 +408,13 @@ Fiber::allocStack() VirtualAlloc((char*)m_stack + g_pagesize, m_stacksize, MEM_COMMIT, PAGE_READWRITE); m_sp = (char*)m_stack + m_stacksize + g_pagesize; #elif defined(POSIX) - m_stack = mmap(NULL, m_stacksize, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0); - if (m_stack == MAP_FAILED) - MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("mmap"); + m_stack = Scheduler::t_stacks_pool->pop(); + if (!m_stack) + { + m_stack = mmap(NULL, m_stacksize, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0); + if (m_stack == MAP_FAILED) + MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("mmap"); + } #if defined(VALGRIND) && (defined(LINUX) || defined(OSX)) m_valgrindStackId = VALGRIND_STACK_REGISTER(m_stack, (char *)m_stack + m_stacksize); #endif @@ -429,7 +435,8 @@ Fiber::freeStack() #if defined(VALGRIND) && (defined(LINUX) || defined(OSX)) VALGRIND_STACK_DEREGISTER(m_valgrindStackId); #endif - munmap(m_stack, m_stacksize); + if (!Scheduler::t_stacks_pool->push(m_stack)) + munmap(m_stack, m_stacksize); #endif } diff --git a/mordor/iomanager_epoll.cpp b/mordor/iomanager_epoll.cpp index d73ab99a..3977076c 100644 --- a/mordor/iomanager_epoll.cpp +++ b/mordor/iomanager_epoll.cpp @@ -102,7 +102,6 @@ IOManager::AsyncState::AsyncState() IOManager::AsyncState::~AsyncState() { - boost::mutex::scoped_lock lock(m_mutex); MORDOR_ASSERT(!m_events); } @@ -144,8 +143,6 @@ IOManager::AsyncState::asyncResetContextFiber(Fiber::ptr fiber) // However, it is needed to acquire the lock and then unlock // to ensure that this function is executed after the other // fiber which scheduled this async reset call. - boost::mutex::scoped_lock lock(m_mutex); - lock.unlock(); fiber.reset(); } @@ -251,7 +248,6 @@ IOManager::registerEvent(int fd, Event event, boost::function dg) MORDOR_ASSERT(event == READ || event == WRITE || event == CLOSE); // Look up our state in the global map, expanding it if necessary - boost::mutex::scoped_lock lock(m_mutex); if (m_pendingEvents.size() < (size_t)fd) m_pendingEvents.resize(fd * 3 / 2); if (!m_pendingEvents[fd - 1]) { @@ -260,9 +256,6 @@ IOManager::registerEvent(int fd, Event event, boost::function dg) } AsyncState &state = *m_pendingEvents[fd - 1]; MORDOR_ASSERT(fd == state.m_fd); - lock.unlock(); - - boost::mutex::scoped_lock lock2(state.m_mutex); MORDOR_ASSERT(!(state.m_events & event)); int op = state.m_events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; @@ -294,16 +287,13 @@ IOManager::unregisterEvent(int fd, Event event) MORDOR_ASSERT(fd > 0); MORDOR_ASSERT(event == READ || event == WRITE || event == CLOSE); - boost::mutex::scoped_lock lock(m_mutex); if (m_pendingEvents.size() < (size_t)fd) return false; if (!m_pendingEvents[fd - 1]) return false; AsyncState &state = *m_pendingEvents[fd - 1]; MORDOR_ASSERT(fd == state.m_fd); - lock.unlock(); - boost::mutex::scoped_lock lock2(state.m_mutex); if (!(state.m_events & event)) return false; @@ -333,16 +323,13 @@ IOManager::cancelEvent(int fd, Event event) MORDOR_ASSERT(fd > 0); MORDOR_ASSERT(event == READ || event == WRITE || event == CLOSE); - boost::mutex::scoped_lock lock(m_mutex); if (m_pendingEvents.size() < (size_t)fd) return false; if (!m_pendingEvents[fd - 1]) return false; AsyncState &state = *m_pendingEvents[fd - 1]; MORDOR_ASSERT(fd == state.m_fd); - lock.unlock(); - boost::mutex::scoped_lock lock2(state.m_mutex); if (!(state.m_events & event)) return false; @@ -416,7 +403,6 @@ IOManager::idle() AsyncState &state = *(AsyncState *)event.data.ptr; - boost::mutex::scoped_lock lock2(state.m_mutex); MORDOR_LOG_TRACE(g_log) << " epoll_event {" << (EPOLL_EVENTS)event.events << ", " << state.m_fd << "}, registered for " << (EPOLL_EVENTS)state.m_events; diff --git a/mordor/iomanager_epoll.h b/mordor/iomanager_epoll.h index 52ee1117..03c4d1c4 100644 --- a/mordor/iomanager_epoll.h +++ b/mordor/iomanager_epoll.h @@ -45,8 +45,6 @@ class IOManager : public Scheduler, public TimerManager int m_fd; EventContext m_in, m_out, m_close; Event m_events; - boost::mutex m_mutex; - private: void asyncResetContextFiber(boost::shared_ptr); }; @@ -76,7 +74,6 @@ class IOManager : public Scheduler, public TimerManager int m_epfd; int m_tickleFds[2]; size_t m_pendingEventCount; - boost::mutex m_mutex; std::vector m_pendingEvents; }; diff --git a/mordor/iomanager_iocp.cpp b/mordor/iomanager_iocp.cpp index 1817365a..1e51bfa4 100644 --- a/mordor/iomanager_iocp.cpp +++ b/mordor/iomanager_iocp.cpp @@ -54,7 +54,6 @@ IOManager::WaitBlock::registerEvent(HANDLE hEvent, boost::function dg, bool recurring) { - boost::mutex::scoped_lock lock(m_mutex); if (m_inUseCount == -1 || m_inUseCount == MAXIMUM_WAIT_OBJECTS - 1) return false; ++m_inUseCount; @@ -79,7 +78,6 @@ typedef boost::function functor; size_t IOManager::WaitBlock::unregisterEvent(HANDLE handle) { - boost::mutex::scoped_lock lock(m_mutex); if (m_inUseCount == -1) return 0; size_t unregistered = 0; @@ -103,7 +101,6 @@ IOManager::WaitBlock::unregisterEvent(HANDLE handle) MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("ResetEvent"); if (!SetEvent(m_handles[0])) MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("SetEvent"); - lock.unlock(); if (WaitForSingleObject(m_reconfigured, INFINITE) == WAIT_FAILED) MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("WaitForSingleObject"); } @@ -118,7 +115,6 @@ IOManager::WaitBlock::run() HANDLE handles[MAXIMUM_WAIT_OBJECTS]; { - boost::mutex::scoped_lock lock(m_mutex); if (m_inUseCount == -1) { // The first/final handle was unregistered out from under us // before we could even start @@ -138,7 +134,6 @@ IOManager::WaitBlock::run() << "): " << dwRet << " (" << lastError() << ")"; if (dwRet == WAIT_OBJECT_0) { // Array just got reconfigured - boost::mutex::scoped_lock lock(m_mutex); if (!SetEvent(m_reconfigured)) MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("SetEvent"); if (m_inUseCount == -1) @@ -147,7 +142,6 @@ IOManager::WaitBlock::run() memcpy(handles, m_handles, (count) * sizeof(HANDLE)); MORDOR_LOG_DEBUG(g_logWaitBlock) << this << " reconfigure " << count; } else if (dwRet >= WAIT_OBJECT_0 + 1 && dwRet < WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS) { - boost::mutex::scoped_lock lock(m_mutex); if (m_inUseCount == -1) { // The final handle was unregistered out from under us @@ -186,7 +180,6 @@ IOManager::WaitBlock::run() MORDOR_LOG_DEBUG(g_logWaitBlock) << this << " done"; { ptr self = shared_from_this(); - boost::mutex::scoped_lock lock(m_outer.m_mutex); std::list::iterator it = std::find(m_outer.m_waitBlocks.begin(), m_outer.m_waitBlocks.end(), shared_from_this()); @@ -267,7 +260,6 @@ IOManager::registerEvent(AsyncEvent *e) MORDOR_LOG_DEBUG(g_log) << this << " registerEvent(" << &e->overlapped << ")"; #ifndef NDEBUG { - boost::mutex::scoped_lock lock(m_mutex); MORDOR_ASSERT(m_pendingEvents.find(&e->overlapped) == m_pendingEvents.end()); m_pendingEvents[&e->overlapped] = e; #endif @@ -288,7 +280,6 @@ IOManager::unregisterEvent(AsyncEvent *e) e->m_fiber.reset(); #ifndef NDEBUG { - boost::mutex::scoped_lock lock(m_mutex); std::map::iterator it = m_pendingEvents.find(&e->overlapped); MORDOR_ASSERT(it != m_pendingEvents.end()); @@ -310,7 +301,6 @@ IOManager::registerEvent(HANDLE handle, boost::function dg, bool recurr MORDOR_ASSERT(handle != INVALID_HANDLE_VALUE); MORDOR_ASSERT(Scheduler::getThis()); - boost::mutex::scoped_lock lock(m_mutex); for (std::list::iterator it = m_waitBlocks.begin(); it != m_waitBlocks.end(); ++it) { @@ -326,7 +316,6 @@ size_t IOManager::unregisterEvent(HANDLE handle) { MORDOR_ASSERT(handle); - boost::mutex::scoped_lock lock(m_mutex); size_t result = 0; for (std::list::iterator it = m_waitBlocks.begin(); it != m_waitBlocks.end(); @@ -377,7 +366,6 @@ IOManager::stopping(unsigned long long &nextTimeout) if (nextTimeout == ~0ull && Scheduler::stopping()) { if (m_pendingEventCount != 0) return false; - boost::mutex::scoped_lock lock(m_mutex); return m_waitBlocks.empty(); } return false; @@ -427,7 +415,6 @@ IOManager::idle() expired.clear(); #ifndef NDEBUG - boost::mutex::scoped_lock lock(m_mutex, boost::defer_lock_t()); #endif int tickles = 0; for (ULONG i = 0; i < count; ++i) { diff --git a/mordor/iomanager_iocp.h b/mordor/iomanager_iocp.h index 8089a8d4..877a1fc8 100644 --- a/mordor/iomanager_iocp.h +++ b/mordor/iomanager_iocp.h @@ -5,7 +5,6 @@ #include #include -#include #include "scheduler.h" #include "timer.h" @@ -51,7 +50,6 @@ class IOManager : public Scheduler, public TimerManager void removeEntry(int index); private: - boost::mutex m_mutex; IOManager &m_outer; HANDLE m_reconfigured; HANDLE m_handles[MAXIMUM_WAIT_OBJECTS]; @@ -92,7 +90,6 @@ class IOManager : public Scheduler, public TimerManager std::map m_pendingEvents; #endif size_t m_pendingEventCount; - boost::mutex m_mutex; std::list m_waitBlocks; }; diff --git a/mordor/scheduler.cpp b/mordor/scheduler.cpp index 45fcadac..288940aa 100644 --- a/mordor/scheduler.cpp +++ b/mordor/scheduler.cpp @@ -7,13 +7,18 @@ #include "atomic.h" #include "assert.h" #include "fiber.h" +#include "config.h" + namespace Mordor { static Logger::ptr g_log = Log::lookup("mordor:scheduler"); +static ConfigVar::ptr g_stacksPoolSize = + Config::lookup("stack.poolsize", 1000, "size of stacks pool free list"); ThreadLocalStorage Scheduler::t_scheduler; ThreadLocalStorage Scheduler::t_fiber; +ThreadLocalStorage Scheduler::t_stacks_pool; Scheduler::Scheduler(size_t threads, bool useCaller, size_t batchSize) : m_activeThreadCount(0), @@ -22,7 +27,10 @@ Scheduler::Scheduler(size_t threads, bool useCaller, size_t batchSize) m_autoStop(false), m_batchSize(batchSize) { - MORDOR_ASSERT(threads >= 1); + MORDOR_ASSERT(threads == 1); + + t_stacks_pool = new StacksPool(g_stacksPoolSize->val()); + if (useCaller) { --threads; MORDOR_ASSERT(getThis() == NULL); @@ -55,7 +63,6 @@ void Scheduler::start() { MORDOR_LOG_VERBOSE(g_log) << this << " starting " << m_threadCount << " threads"; - boost::mutex::scoped_lock lock(m_mutex); if (!m_stopping) return; // TODO: There may be a race condition here if one thread calls stop(), @@ -75,7 +82,6 @@ Scheduler::start() bool Scheduler::hasWorkToDo() { - boost::mutex::scoped_lock lock(m_mutex); return !m_fibers.empty(); } @@ -135,7 +141,6 @@ Scheduler::stop() << " waiting for other threads to stop"; std::vector > threads; { - boost::mutex::scoped_lock lock(m_mutex); threads.swap(m_threads); } for (std::vector >::const_iterator it @@ -151,7 +156,6 @@ Scheduler::stop() bool Scheduler::stopping() { - boost::mutex::scoped_lock lock(m_mutex); return m_stopping && m_fibers.empty() && m_activeThreadCount == 0; } @@ -160,7 +164,6 @@ Scheduler::schedule(Fiber::ptr f, tid_t thread) { bool tickleMe; { - boost::mutex::scoped_lock lock(m_mutex); tickleMe = scheduleNoLock(f, thread); } if (tickleMe && Scheduler::getThis() != this) @@ -172,7 +175,6 @@ Scheduler::schedule(boost::function dg, tid_t thread) { bool tickleMe; { - boost::mutex::scoped_lock lock(m_mutex); tickleMe = scheduleNoLock(dg, thread); } if (tickleMe && Scheduler::getThis() != this) @@ -275,7 +277,6 @@ Scheduler::threadCount(size_t threads) MORDOR_ASSERT(threads >= 1); if (m_rootFiber) --threads; - boost::mutex::scoped_lock lock(m_mutex); if (threads == m_threadCount) { return; } else if (threads > m_threadCount) { @@ -323,7 +324,6 @@ Scheduler::run() bool dontIdle = false; bool tickleMe = false; { - boost::mutex::scoped_lock lock(m_mutex); // Kill ourselves off if needed if (m_threads.size() > m_threadCount && gettid() != m_rootThread) { // Accounting @@ -429,7 +429,6 @@ Scheduler::run() MORDOR_LOG_FATAL(Log::root()) << boost::current_exception_diagnostic_information(); { - boost::mutex::scoped_lock lock(m_mutex); std::vector::iterator it2 = it; // push all un-executed fibers back to m_fibers while (++it2 != batch.end()) { diff --git a/mordor/scheduler.h b/mordor/scheduler.h index 068da6ad..365f3002 100644 --- a/mordor/scheduler.h +++ b/mordor/scheduler.h @@ -7,13 +7,50 @@ #include #include #include -#include +#include +#include #include "thread.h" #include "thread_local_storage.h" +#include "config.h" namespace Mordor { +class StacksPool +{ +public: + std::list m_free_list; + int m_pool_size; + int m_max_pool_size; + + + StacksPool(int max_pool_size) : m_pool_size(0), m_max_pool_size (max_pool_size) + { + } + + inline bool push (void* stack) + { + if (m_pool_size >= m_max_pool_size) + return false; + + ++m_pool_size; + m_free_list.push_back (stack); + return true; + } + + inline void* pop () + { + if (!m_pool_size) + return NULL; + + --m_pool_size; + void* stack = m_free_list.back (); + m_free_list.pop_back (); + return stack; + } + +}; + class Fiber; /// Cooperative user-mode thread (Fiber) Scheduler @@ -94,7 +131,6 @@ class Scheduler : public boost::noncopyable { bool tickleMe = false; { - boost::mutex::scoped_lock lock(m_mutex); while (begin != end) { tickleMe = scheduleNoLock(*begin) || tickleMe; ++begin; @@ -188,9 +224,11 @@ class Scheduler : public boost::noncopyable boost::function dg; tid_t thread; }; +public: + static ThreadLocalStorage t_stacks_pool; +private: static ThreadLocalStorage t_scheduler; static ThreadLocalStorage t_fiber; - boost::mutex m_mutex; std::list m_fibers; tid_t m_rootThread; boost::shared_ptr m_rootFiber; diff --git a/mordor/socket.cpp b/mordor/socket.cpp index 7591307c..fa84e475 100644 --- a/mordor/socket.cpp +++ b/mordor/socket.cpp @@ -570,15 +570,18 @@ Socket::listen(int backlog) } Socket::ptr -Socket::accept() +Socket::accept(IOManager *ioManager) { - Socket::ptr sock(new Socket(m_ioManager, m_family, type(), m_protocol, 0)); - accept(*sock.get()); + if (!ioManager) + ioManager = m_ioManager; + + Socket::ptr sock(new Socket(ioManager, m_family, type(), m_protocol, 0)); + accept(*sock.get(), ioManager); return sock; } void -Socket::accept(Socket &target) +Socket::accept(Socket &target, IOManager *ioManager) { #ifdef WINDOWS if (m_useAcceptEx && pAcceptEx && m_ioManager) { @@ -591,7 +594,7 @@ Socket::accept(Socket &target) #endif MORDOR_ASSERT(target.m_family == m_family); MORDOR_ASSERT(target.m_protocol == m_protocol); - if (!m_ioManager) { + if (!ioManager) { socket_t newsock = ::accept(m_sock, NULL, NULL); if (newsock == -1) { MORDOR_LOG_ERROR(g_log) << this << " accept(" << m_sock << "): " @@ -604,37 +607,37 @@ Socket::accept(Socket &target) } else { #ifdef WINDOWS if (m_useAcceptEx && pAcceptEx) { - m_ioManager->registerEvent(&m_receiveEvent); + ioManager->registerEvent(&m_receiveEvent); unsigned char addrs[sizeof(SOCKADDR_STORAGE) * 2 + 16]; DWORD bytes; BOOL ret = pAcceptEx(m_sock, target.m_sock, addrs, 0, sizeof(SOCKADDR_STORAGE) + 16, sizeof(SOCKADDR_STORAGE) + 16, &bytes, &m_receiveEvent.overlapped); if (!ret && lastError() != WSA_IO_PENDING) { if (lastError() == WSAENOTSOCK) { - m_ioManager->unregisterEvent(&m_receiveEvent); + ioManager->unregisterEvent(&m_receiveEvent); // See comment in similar line in connect() goto suckylsp; } MORDOR_LOG_ERROR(g_log) << this << " AcceptEx(" << m_sock << "): (" << lastError() << ")"; - m_ioManager->unregisterEvent(&m_receiveEvent); + ioManager->unregisterEvent(&m_receiveEvent); MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("AcceptEx"); } if (m_skipCompletionPortOnSuccess && ret) { - m_ioManager->unregisterEvent(&m_receiveEvent); + ioManager->unregisterEvent(&m_receiveEvent); m_receiveEvent.overlapped.Internal = STATUS_SUCCESS; } else { if (m_cancelledReceive) { MORDOR_LOG_ERROR(g_log) << this << " AcceptEx(" << m_sock << "): (" << m_cancelledReceive << ")"; - m_ioManager->cancelEvent((HANDLE)m_sock, &m_receiveEvent); + ioManager->cancelEvent((HANDLE)m_sock, &m_receiveEvent); Scheduler::yieldTo(); MORDOR_THROW_EXCEPTION_FROM_ERROR_API(m_cancelledReceive, "AcceptEx"); } Timer::ptr timeout; if (m_receiveTimeout != ~0ull) - timeout = m_ioManager->registerTimer(m_receiveTimeout, boost::bind( - &IOManager::cancelEvent, m_ioManager, (HANDLE)m_sock, &m_receiveEvent)); + timeout = ioManager->registerTimer(m_receiveTimeout, boost::bind( + &IOManager::cancelEvent, ioManager, (HANDLE)m_sock, &m_receiveEvent)); Scheduler::yieldTo(); if (timeout) timeout->cancel(); @@ -661,7 +664,7 @@ Socket::accept(Socket &target) MORDOR_LOG_INFO(g_log) << this << " AcceptEx(" << m_sock << "): " << target.m_sock << " (" << *target.remoteAddress() << ')'; target.setOption(SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, &m_sock, sizeof(m_sock)); - target.m_ioManager->registerFile((HANDLE)target.m_sock); + target.ioManager->registerFile((HANDLE)target.m_sock); target.m_skipCompletionPortOnSuccess = !!pSetFileCompletionNotificationModes((HANDLE)target.m_sock, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS | @@ -681,20 +684,20 @@ Socket::accept(Socket &target) if (newsock != -1) { // Worked first time } else if (lastError() == WSAEWOULDBLOCK) { - m_ioManager->registerEvent(m_hEvent); + ioManager->registerEvent(m_hEvent); m_fiber = Fiber::getThis(); m_scheduler = Scheduler::getThis(); if (m_cancelledReceive) { MORDOR_LOG_ERROR(g_log) << this << " accept(" << m_sock << "): (" << m_cancelledReceive << ")"; - if (!m_ioManager->unregisterEvent(m_hEvent)) + if (!ioManager->unregisterEvent(m_hEvent)) Scheduler::yieldTo(); MORDOR_THROW_EXCEPTION_FROM_ERROR_API(m_cancelledReceive, "accept"); } m_unregistered = false; Timer::ptr timeout; if (m_receiveTimeout != ~0ull) - timeout = m_ioManager->registerTimer(m_sendTimeout, + timeout = ioManager->registerTimer(m_sendTimeout, boost::bind(&Socket::cancelIo, this, boost::ref(m_cancelledReceive), WSAETIMEDOUT)); Scheduler::yieldTo(); @@ -723,7 +726,7 @@ Socket::accept(Socket &target) MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("accept"); } try { - m_ioManager->registerFile((HANDLE)newsock); + ioManager->registerFile((HANDLE)newsock); } catch(...) { closesocket(newsock); throw; @@ -746,17 +749,19 @@ Socket::accept(Socket &target) error = errno; } while (newsock == -1 && error == EINTR); while (newsock == -1 && error == EAGAIN) { - m_ioManager->registerEvent(m_sock, IOManager::READ); + //ensure we are not already registered (assert will fire) + ioManager->unregisterEvent(m_sock, IOManager::READ); + ioManager->registerEvent(m_sock, IOManager::READ); if (m_cancelledReceive) { MORDOR_LOG_ERROR(g_log) << this << " accept(" << m_sock << "): (" << m_cancelledReceive << ")"; - m_ioManager->cancelEvent(m_sock, IOManager::READ); + ioManager->cancelEvent(m_sock, IOManager::READ); Scheduler::yieldTo(); MORDOR_THROW_EXCEPTION_FROM_ERROR_API(m_cancelledReceive, "accept"); } Timer::ptr timeout; if (m_receiveTimeout != ~0ull) - timeout = m_ioManager->registerTimer(m_receiveTimeout, boost::bind( + timeout = ioManager->registerTimer(m_receiveTimeout, boost::bind( &Socket::cancelIo, this, IOManager::READ, boost::ref(m_cancelledReceive), ETIMEDOUT)); Scheduler::yieldTo(); diff --git a/mordor/socket.h b/mordor/socket.h index 570a70ca..2833e736 100644 --- a/mordor/socket.h +++ b/mordor/socket.h @@ -21,6 +21,7 @@ #include #include #include +#include #ifndef OSX # include # include @@ -93,7 +94,7 @@ class Socket : public boost::enable_shared_from_this, boost::noncopyable { connect(*addr.get()); } void listen(int backlog = SOMAXCONN); - Socket::ptr accept(); + Socket::ptr accept(IOManager *ioManager = NULL); void shutdown(int how = SHUT_RDWR); void getOption(int level, int option, void *result, size_t *len); @@ -153,7 +154,7 @@ class Socket : public boost::enable_shared_from_this, boost::noncopyable size_t doIO(iovec *buffers, size_t length, int &flags, Address *address = NULL); static void callOnRemoteClose(weak_ptr self); void registerForRemoteClose(); - void accept(Socket &target); + void accept(Socket &target, IOManager *ioManager); #ifdef WINDOWS // For WSAEventSelect diff --git a/mordor/streams/buffered.cpp b/mordor/streams/buffered.cpp index 1583211e..7a2b3ff7 100644 --- a/mordor/streams/buffered.cpp +++ b/mordor/streams/buffered.cpp @@ -9,7 +9,7 @@ namespace Mordor { static ConfigVar::ptr g_defaultBufferSize = - Config::lookup("stream.buffered.defaultbuffersize", 65536, + Config::lookup("stream.buffered.defaultbuffersize", 4096, "Default buffer size for new BufferedStreams"); static Logger::ptr g_log = Log::lookup("mordor:streams:buffered"); diff --git a/mordor/streams/ssl.cpp b/mordor/streams/ssl.cpp index a28270d7..12461bcd 100644 --- a/mordor/streams/ssl.cpp +++ b/mordor/streams/ssl.cpp @@ -161,6 +161,20 @@ void add_ext(X509 *cert, int nid, const char *value) X509_EXTENSION_free(ex); } +SSL_CTX* SSLStream::createSSLCTX () +{ + SSL_CTX* ssl_ctx = SSL_CTX_new(SSLv23_server_method()); + + boost::shared_ptr cert; + boost::shared_ptr pkey; + mkcert(cert, pkey, 1024, rand(), 365); + SSL_CTX_use_certificate(ssl_ctx, cert.get()); + SSL_CTX_use_PrivateKey(ssl_ctx, pkey.get()); + + SSL_CTX_set_session_cache_mode (ssl_ctx, SSL_SESS_CACHE_BOTH); + + return ssl_ctx; +} SSLStream::SSLStream(Stream::ptr parent, bool client, bool own, SSL_CTX *ctx) : MutatingFilterStream(parent, own) @@ -205,6 +219,18 @@ SSLStream::SSLStream(Stream::ptr parent, bool client, bool own, SSL_CTX *ctx) SSL_set_bio(m_ssl.get(), m_readBio, m_writeBio); } +SSLStream::~SSLStream() +{ + try + { + SSLStream::close (); + + } catch (...) { + //TODO + std::cerr << boost::current_exception_diagnostic_information() << std::endl; + } + } + void SSLStream::close(CloseType type) { @@ -387,7 +413,8 @@ SSLStream::write(const void *buffer, size_t length) << toWrite << "): " << result << " (" << error << ")"; switch (error) { case SSL_ERROR_NONE: - return result; + flush(false); + return result; case SSL_ERROR_ZERO_RETURN: // Received close_notify message MORDOR_ASSERT(result != 0); diff --git a/mordor/streams/ssl.h b/mordor/streams/ssl.h index b3c3a7eb..c67216ea 100644 --- a/mordor/streams/ssl.h +++ b/mordor/streams/ssl.h @@ -6,6 +6,8 @@ #include +#include + #include #include "buffer.h" @@ -53,6 +55,10 @@ class SSLStream : public MutatingFilterStream public: SSLStream(Stream::ptr parent, bool client = true, bool own = true, SSL_CTX *ctx = NULL); + static SSL_CTX* createSSLCTX (); + + ~SSLStream(); + bool supportsHalfClose() { return false; } void close(CloseType type = BOTH); diff --git a/mordor/streams/transfer.cpp b/mordor/streams/transfer.cpp index 0be7ea23..7f5f5173 100644 --- a/mordor/streams/transfer.cpp +++ b/mordor/streams/transfer.cpp @@ -18,6 +18,11 @@ static ConfigVar::ptr g_chunkSize = Config::lookup("transferstream.chunksize", (size_t)65536, "transfer chunk size."); + +static ConfigVar::ptr g_defaultSize = + Config::lookup("transferstream.defaultsize", 4096, + "Default size for transfer stream"); + static Logger::ptr g_log = Log::lookup("mordor:stream:transfer"); static void readOne(Stream &src, Buffer *&buffer, size_t len, size_t &result) @@ -120,4 +125,21 @@ unsigned long long transferStream(Stream &src, Stream &dst, return totalRead; } +unsigned long long transferStreamDirect(Stream::ptr src, Stream::ptr dst) { + size_t size = g_defaultSize->val(); + char buffer[size]; + size_t total = 0; + + while (true) + { + size_t len = src->read(buffer, size); + total += len; + if (!len) + break; + dst->write (buffer, len); + } + + return total; +} + } diff --git a/mordor/streams/transfer.h b/mordor/streams/transfer.h index 2f702fe6..4dd1e8fb 100644 --- a/mordor/streams/transfer.h +++ b/mordor/streams/transfer.h @@ -16,6 +16,8 @@ enum ExactLength UNTILEOF }; +unsigned long long transferStreamDirect(Stream::ptr src, Stream::ptr dst); + unsigned long long transferStream(Stream &src, Stream &dst, unsigned long long toTransfer = ~0ull, ExactLength exactLength = INFER); diff --git a/mordor/timer.cpp b/mordor/timer.cpp index d51a9d6f..fa5c7f3c 100644 --- a/mordor/timer.cpp +++ b/mordor/timer.cpp @@ -105,7 +105,6 @@ bool Timer::cancel() { MORDOR_LOG_DEBUG(g_log) << this << " cancel"; - boost::mutex::scoped_lock lock(m_manager->m_mutex); if (m_dg) { m_dg = NULL; std::set::iterator it = @@ -120,7 +119,6 @@ Timer::cancel() bool Timer::refresh() { - boost::mutex::scoped_lock lock(m_manager->m_mutex); if (!m_dg) return false; std::set::iterator it = @@ -129,7 +127,6 @@ Timer::refresh() m_manager->m_timers.erase(it); m_next = TimerManager::now() + m_us; m_manager->m_timers.insert(shared_from_this()); - lock.unlock(); MORDOR_LOG_DEBUG(g_log) << this << " refresh"; return true; } @@ -137,7 +134,6 @@ Timer::refresh() bool Timer::reset(unsigned long long us, bool fromNow) { - boost::mutex::scoped_lock lock(m_manager->m_mutex); if (!m_dg) return false; // No change @@ -158,7 +154,6 @@ Timer::reset(unsigned long long us, bool fromNow) bool atFront = (it == m_manager->m_timers.begin()) && !m_manager->m_tickled; if (atFront) m_manager->m_tickled = true; - lock.unlock(); MORDOR_LOG_DEBUG(g_log) << this << " reset to " << m_us; if (atFront) m_manager->onTimerInsertedAtFront(); @@ -173,7 +168,6 @@ TimerManager::TimerManager() TimerManager::~TimerManager() { #ifndef NDEBUG - boost::mutex::scoped_lock lock(m_mutex); MORDOR_ASSERT(m_timers.empty()); #endif } @@ -184,13 +178,11 @@ TimerManager::registerTimer(unsigned long long us, boost::function dg, { MORDOR_ASSERT(dg); Timer::ptr result(new Timer(us, dg, recurring, this)); - boost::mutex::scoped_lock lock(m_mutex); std::set::iterator it = m_timers.insert(result).first; bool atFront = (it == m_timers.begin()) && !m_tickled; if (atFront) m_tickled = true; - lock.unlock(); MORDOR_LOG_DEBUG(g_log) << result.get() << " registerTimer(" << us << ", " << recurring << "): " << atFront; if (atFront) @@ -224,7 +216,6 @@ stubOnTimer( unsigned long long TimerManager::nextTimer() { - boost::mutex::scoped_lock lock(m_mutex); m_tickled = false; if (m_timers.empty()) { MORDOR_LOG_DEBUG(g_log) << this << " nextTimer(): ~0ull"; @@ -268,7 +259,6 @@ TimerManager::processTimers() std::vector > result; unsigned long long nowUs = now(); { - boost::mutex::scoped_lock lock(m_mutex); if (m_timers.empty()) return result; bool rollover = detectClockRollover(nowUs); diff --git a/mordor/timer.h b/mordor/timer.h index 16c6ce43..5289f5e7 100644 --- a/mordor/timer.h +++ b/mordor/timer.h @@ -103,7 +103,6 @@ class TimerManager : public boost::noncopyable static boost::function ms_clockDg; bool detectClockRollover(unsigned long long nowUs); std::set m_timers; - boost::mutex m_mutex; bool m_tickled; unsigned long long m_previousTime; };