diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cf498fe..c00ab6f 100755 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,5 +53,5 @@ jobs: - name: Run test suite run: > - curl --connect-timeout 3 --retry 3 --retry-delay 0 --retry-max-time 30 -sSL https://baltocdn.com/xp-framework/xp-runners/distribution/downloads/e/entrypoint/xp-run-8.6.2.sh > xp-run && - sh xp-run xp.test.Runner src/test/php + curl -sSL https://baltocdn.com/xp-framework/xp-runners/distribution/downloads/e/entrypoint/xp-run-8.6.2.sh > xp-run && + sh xp-run xp.test.Runner -r Dots src/test/php diff --git a/composer.json b/composer.json index 0bb5d91..eb48354 100755 --- a/composer.json +++ b/composer.json @@ -12,7 +12,7 @@ "xp-framework/tokenize": "^9.0 | ^8.0", "xp-forge/json": "^5.0 | ^4.0 | ^3.1", "xp-forge/uri": "^2.0 | ^1.3", - "xp-forge/marshalling": "^1.0", + "xp-forge/marshalling": "^2.0 | ^1.0", "xp-forge/compression": "^1.0", "php": ">=7.0.0" }, diff --git a/src/main/php/webservices/rest/Endpoint.class.php b/src/main/php/webservices/rest/Endpoint.class.php index ac4e0dd..c5d7b1e 100755 --- a/src/main/php/webservices/rest/Endpoint.class.php +++ b/src/main/php/webservices/rest/Endpoint.class.php @@ -9,7 +9,7 @@ use util\data\Marshalling; use util\log\Traceable; use util\{URI, Authority}; -use webservices\rest\io\{Buffered, Reader, Streamed, Traced, Transmission}; +use webservices\rest\io\{Buffered, Reader, Transfer, Streamed, Transmission}; /** * Entry point class @@ -64,12 +64,23 @@ public function __construct($base, Formats $formats= null, $compressing= null) { ; $this->formats= $formats ?: Formats::defaults(); - $this->transfer= new Streamed($this); + $this->transfer= new Streamed(); $this->marshalling= new Marshalling(); $this->connections= function($uri) { return new HttpConnection($uri); }; $this->compressing($compressing ?? Compression::algorithms()->supported()); } + /** + * Use given transfer method, which defaults to streaming. + * + * @param webservices.rest.io.Transfer $transfer + * @return self + */ + public function using(Transfer $transfer) { + $this->transfer= $transfer; + return $this; + } + /** * Use buffering for sending requests; ensuring they have a "Content-Length" * header. This will be slower, especially for big requests, but is more @@ -80,7 +91,7 @@ public function __construct($base, Formats $formats= null, $compressing= null) { * @return self */ public function buffered() { - $this->transfer= new Buffered($this); + $this->transfer= new Buffered(); return $this; } @@ -169,13 +180,7 @@ public function resource($path, $segments= []) { * @return void */ public function setTrace($cat) { - if (null === $cat) { - $this->transfer= $this->transfer->untraced(); - } else if ($this->transfer instanceof Traced) { - $this->transfer->use($cat); - } else { - $this->transfer= new Traced($this->transfer, $cat); - } + $this->cat= $cat; } /** @@ -207,7 +212,8 @@ public function open(RestRequest $request) { $s->setTarget($target->path()); $s->addHeaders($headers); $s->setParameters($request->parameters()); - return $this->transfer->transmission($conn, $s, $target); + + return new Transmission($conn, $s, $target, $this->transfer, $this->cat); } /** @@ -220,9 +226,14 @@ public function open(RestRequest $request) { public function finish(Transmission $transmission) { try { $r= $transmission->finish(); - $output= $this->formats->named($r->header('Content-Type')[0] ?? null); - $reader= $this->transfer->reader($r, $output, $this->marshalling); - return new RestResponse($r->statusCode(), $r->message(), $r->headers(), $reader, $transmission->target); + $in= $transmission->stream($r); + return new RestResponse( + $r->statusCode(), + $r->message(), + $r->headers(), + new Reader($in, $this->formats->named($r->header('Content-Type')[0] ?? null), $this->marshalling), + $transmission->target + ); } catch (Throwable $e) { throw new RestException('Cannot send request', $e); } @@ -236,7 +247,15 @@ public function finish(Transmission $transmission) { * @throws webservices.rest.RestException */ public function execute(RestRequest $request) { - $input= $this->formats->named($request->header('Content-Type')); - return $this->transfer->writer($request, $input, $this->marshalling); + $transmission= $this->open($request); + + if ($payload= $request->payload()) { + $transmission->transmit( + $this->marshalling->marshal($payload->value()), + $this->formats->named($request->header('Content-Type')) + ); + } + + return $this->finish($transmission); } } \ No newline at end of file diff --git a/src/main/php/webservices/rest/TestCall.class.php b/src/main/php/webservices/rest/TestCall.class.php index 7e8b2e0..ee2d5e8 100644 --- a/src/main/php/webservices/rest/TestCall.class.php +++ b/src/main/php/webservices/rest/TestCall.class.php @@ -5,7 +5,7 @@ use webservices\rest\io\{Reader, Transmission}; class TestCall extends Transmission { - private $formats, $marshalling; + private $request, $formats, $marshalling; public $transfer= null; /** Creates a new call */ diff --git a/src/main/php/webservices/rest/io/Buffered.class.php b/src/main/php/webservices/rest/io/Buffered.class.php index a353f87..878efca 100755 --- a/src/main/php/webservices/rest/io/Buffered.class.php +++ b/src/main/php/webservices/rest/io/Buffered.class.php @@ -4,12 +4,33 @@ class Buffered extends Transfer { - public function headers($length) { return ['Content-Length' => $length]; } + /** + * Start buffering + * + * @param peer.http.HttpConnection $conn + * @param peer.http.HttpRequest $request + * @return io.streams.OutputStream + */ + public function start($conn, $request) { + return new MemoryOutputStream(); + } + + /** + * Finish buffering + * + * @param peer.http.HttpConnection $conn + * @param peer.http.HttpRequest $request + * @param io.streams.OutputStream $output + * @return peer.http.HttpResponse + */ + public function finish($conn, $request, $output) { + if (null === $output) return $conn->send($request); + + $bytes= $output->bytes(); + $request->setHeader('Content-Length', strlen($bytes)); - public function stream($request, $format, $value) { - $bytes= $format->serialize($value, new MemoryOutputStream())->bytes(); - $stream= $this->endpoint->open($request->with(['Content-Length' => strlen($bytes)])); + $stream= $conn->open($request); $stream->write($bytes); - return $stream; + return $conn->finish($stream); } } \ No newline at end of file diff --git a/src/main/php/webservices/rest/io/Streamed.class.php b/src/main/php/webservices/rest/io/Streamed.class.php index 29f9f43..781a736 100755 --- a/src/main/php/webservices/rest/io/Streamed.class.php +++ b/src/main/php/webservices/rest/io/Streamed.class.php @@ -1,16 +1,29 @@ [], - 'Transfer-Encoding' => ['chunked'], - ]; - public function headers($length) { return self::HEADERS; } + /** + * Start streaming + * + * @param peer.http.HttpConnection $conn + * @param peer.http.HttpRequest $request + * @return io.streams.OutputStream + */ + public function start($conn, $request) { + $request->setHeader('Transfer-Encoding', 'chunked'); + $request->setHeader('Content-Length', null); + return $conn->open($request); + } - public function stream($request, $format, $value) { - $stream= $this->endpoint->open($request->with(self::HEADERS)); - $format->serialize($value, $stream); - return $stream; + /** + * Finish streaming + * + * @param peer.http.HttpConnection $conn + * @param peer.http.HttpRequest $request + * @param io.streams.OutputStream $output + * @return peer.http.HttpResponse + */ + public function finish($conn, $request, $output) { + return $output ? $conn->finish($output) : $conn->send($request); } } \ No newline at end of file diff --git a/src/main/php/webservices/rest/io/Traced.class.php b/src/main/php/webservices/rest/io/Traced.class.php deleted file mode 100755 index 54484ba..0000000 --- a/src/main/php/webservices/rest/io/Traced.class.php +++ /dev/null @@ -1,88 +0,0 @@ -untraced= $untraced; - $this->cat= $cat; - } - - /** @return parent */ - public function untraced() { return $this->untraced; } - - /** - * Use another logging category - * - * @param util.log.LogCategory $cat - * @return self - */ - public function use($cat) { - $this->cat= $cat; - return $this; - } - - public function transmission($conn, $s, $target) { - return new class($conn, $s, $target, $this->cat) extends Transmission { - private $cat; - private $transferred= 0; - - public function __construct($conn, $request, $target, $cat) { - parent::__construct($conn, $request, $target); - $this->cat= $cat; - } - - public function start() { - $this->cat->info('>>>', substr($this->request->getHeaderString(), 0, -2)); - $this->output= $this->conn->open($this->request); - } - - public function write($bytes) { - $this->transferred+= strlen($bytes); - return parent::write($bytes); - } - - public function finish() { - if (null === $this->output) { - $this->cat->info('>>>', substr($this->request->getHeaderString(), 0, -2)); - return $this->conn->send($this->request); - } else { - $this->cat->debug("({$this->transferred} bytes transferred)"); - return $this->conn->finish($this->output); - } - } - }; - } - - public function writer($request, $format, $marshalling) { - if ($payload= $request->payload()) { - $bytes= $format->serialize($marshalling->marshal($payload->value()), new MemoryOutputStream())->bytes(); - $stream= $this->untraced->endpoint->open($request->with($this->untraced->headers(strlen($bytes)))); - $stream->start(); - - // Include complete payload in debug trace (before sending it). - $this->cat->debug($bytes); - $stream->write($bytes); - } else { - $stream= $this->untraced->endpoint->open($request); - } - - return $this->untraced->endpoint->finish($stream); - } - - public function reader($response, $format, $marshalling) { - $this->cat->info('<<<', substr($response->getHeaderString(), 0, -2)); - $bytes= Streams::readAll($response->in()); - $this->cat->debug($bytes); - - return new Reader(new MemoryInputStream($bytes), $format, $marshalling); - } -} \ No newline at end of file diff --git a/src/main/php/webservices/rest/io/Transfer.class.php b/src/main/php/webservices/rest/io/Transfer.class.php index ce7344e..0aaf2d0 100755 --- a/src/main/php/webservices/rest/io/Transfer.class.php +++ b/src/main/php/webservices/rest/io/Transfer.class.php @@ -1,46 +1,23 @@ endpoint= $endpoint; } - - /** @return self */ - public function untraced() { return $this; } - - public function transmission($conn, $s, $target) { - return new Transmission($conn, $s, $target); - } - - public function headers($length) { - throw new MethodNotImplementedException(__METHOD__); - } - - public function stream($request, $format, $payload) { - throw new MethodNotImplementedException(__METHOD__); - } - - public function writer($request, $format, $marshalling) { - if ($payload= $request->payload()) { - $stream= $this->stream($request, $format, $marshalling->marshal($payload->value())); - } else { - $stream= $this->endpoint->open($request); - } - - return $this->endpoint->finish($stream); - } - - public function reader($response, $format, $marshalling) { - if ($encoding= $response->header('Content-Encoding')) { - $in= Compression::named($encoding[0])->open($response->in()); - } else { - $in= $response->in(); - } - return new Reader($in, $format, $marshalling); - } + /** + * Start transfer + * + * @param peer.http.HttpConnection $conn + * @param peer.http.HttpRequest $request + * @return io.streams.OutputStream + */ + public abstract function start($conn, $request); + + /** + * Finish transfer + * + * @param peer.http.HttpConnection $conn + * @param peer.http.HttpRequest $request + * @param io.streams.OutputStream $output + * @return peer.http.HttpResponse + */ + public abstract function finish($conn, $request, $output); } \ No newline at end of file diff --git a/src/main/php/webservices/rest/io/Transmission.class.php b/src/main/php/webservices/rest/io/Transmission.class.php index 5514bf9..002bde6 100755 --- a/src/main/php/webservices/rest/io/Transmission.class.php +++ b/src/main/php/webservices/rest/io/Transmission.class.php @@ -1,10 +1,10 @@ conn= $conn; $this->request= $request; $this->target= $target; + $this->transfer= $transfer; + $this->cat= $cat; } /** @return peer.http.HttpConnection */ public function connection() { return $this->conn; } + /** + * Transmit payload in a given format + * + * @param var $payload + * @param webservices.rest.format.Format $format + */ + public function transmit($payload, $format) { + $this->start(); + + // Include complete payload in debug trace (before sending it). + if ($this->cat) { + $bytes= $format->serialize($payload, new MemoryOutputStream())->bytes(); + $this->cat->debug($bytes); + $this->write($bytes); + } else { + $format->serialize($payload, $this); + } + } + /** @return void */ - public function start() { - $this->output= $this->conn->open($this->request); + private function start() { + $this->output= $this->transfer->start($this->conn, $this->request); + $this->cat && $this->cat->info('>>>', substr($this->request->getHeaderString(), 0, -2)); + $this->transferred= 0; } /** @@ -36,7 +61,11 @@ public function start() { */ public function write($bytes) { $this->output ?? $this->start(); - return $this->output->write($bytes); + $this->output->write($bytes); + + $l= strlen($bytes); + $this->transferred+= $l; + return $l; } /** @return void */ @@ -51,6 +80,34 @@ public function close() { /** @return peer.http.HttpResponse */ public function finish() { - return $this->output ? $this->conn->finish($this->output) : $this->conn->send($this->request); + if (null === $this->output) { + $this->cat && $this->cat->info('>>>', substr($this->request->getHeaderString(), 0, -2)); + } else { + $this->cat && $this->cat->debug("({$this->transferred} bytes transferred)"); + } + + $response= $this->transfer->finish($this->conn, $this->request, $this->output); + $this->cat && $this->cat->info('<<<', substr($response->getHeaderString(), 0, -2)); + return $response; + } + + /** + * Returns a stream + * + * @param peer.http.HttpResponse $response + * @return io.streams.InputStream + */ + public function stream($response) { + if ($encoding= $response->header('Content-Encoding')) { + $in= Compression::named($encoding[0])->open($response->in()); + } else { + $in= $response->in(); + } + if (null === $this->cat) return $in; + + // Include complete payload in debug trace (before returning it). + $content= Streams::readAll($in); + $this->cat->debug($content); + return new MemoryInputStream($content); } } \ No newline at end of file