Skip to content

Commit

Permalink
Merge pull request #18 from xp-forge/feature/event-streams
Browse files Browse the repository at this point in the history
Support AWS event streams (`application/vnd.amazon.eventstream`)
  • Loading branch information
thekid authored Aug 18, 2024
2 parents 368bc37 + ddb4ac3 commit c005462
Show file tree
Hide file tree
Showing 7 changed files with 468 additions and 1 deletion.
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,36 @@ try {
}
```

Streaming responses from Bedrock AI models
------------------------------------------
See https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html:

```php
use com\amazon\aws\{ServiceEndpoint, CredentialProvider};
use util\cmd\Console;

$model= 'anthropic.claude-3-5-sonnet-20240620-v1:0';
$runtime= (new ServiceEndpoint('bedrock', CredentialProvider::default()))
->using('bedrock-runtime.')
->in('eu-central-1')
;

$response= $runtime->resource('/model/{0}/converse-stream', [$model])->transmit([
'system' => [['text' => 'Use informal language']],
'messages' => [
['role' => 'user', 'content' => [['text' => $argv[1]]]],
],
'inferenceConfig' => [
'maxTokens' => 1000,
'temperature' => 0.5,
],
]);
foreach ($response->events() as $event) {
Console::writeLine($event->header(':event-type'), ': ', $event->value());
}
```


See also
--------
* [AWS Lambda for XP Framework](https://github.com/xp-forge/lambda)
Expand Down
73 changes: 73 additions & 0 deletions src/main/php/com/amazon/aws/api/Event.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php namespace com\amazon\aws\api;

use lang\{IllegalStateException, Value};
use util\{Comparison, Objects};
use text\json\{Json, StringInput};

/** @test com.amazon.aws.unittest.EventTest */
class Event implements Value {
use Comparison;

private $source, $headers, $content;

/**
* Creates a new event
*
* @param com.amazon.aws.api.EventStream $source
* @param [:var] $headers
* @param string $content
*/
public function __construct(EventStream $source, $headers, $content= '') {
$this->source= $source;
$this->headers= $headers;
$this->content= $content;
}

/** @return [:var] */
public function headers() { return $this->headers; }

/** @return string */
public function content() { return $this->content; }

/**
* Gets a header by name
*
* @param string $name
* @param var $default
* @return var
*/
public function header($name, $default= null) {
return $this->headers[$name] ?? $default;
}

/**
* Returns deserialized value, raising an error if the content
* type is unknown.
*
* @param ?string|lang.Type $type
* @return var
* @throws lang.IllegalStateException
*/
public function value($type= null) {
switch ($mime= ($this->headers[':content-type'] ?? null)) {
case 'application/json': $value= Json::read(new StringInput($this->content)); break;
case 'text/plain': $value= $this->content; break;
default: throw new IllegalStateException('Cannot deserialize '.($mime ?? 'without content type'));
}

return null === $type || null === $this->source->marshalling
? $value
: $this->source->marshalling->unmarshal($value, $type)
;
}

/** @return string */
public function toString() {
return (
nameof($this)." {\n".
' [headers] '.Objects::stringOf($this->headers, ' ')."\n".
' [content] '.$this->content."\n".
'}'
);
}
}
178 changes: 178 additions & 0 deletions src/main/php/com/amazon/aws/api/EventStream.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
<?php namespace com\amazon\aws\api;

use IteratorAggregate, Traversable;
use io\streams\InputStream;
use lang\IllegalStateException;
use util\data\Marshalling;
use util\{Bytes, Date, UUID};

/**
* Amazon event stream, mime type `application/vnd.amazon.eventstream`.
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
* @see https://github.com/aws/aws-sdk-go-v2/blob/main/aws/protocol/eventstream/header_value.go
* @see com.amazon.aws.api.Response::events()
* @test com.amazon.aws.unittest.EventStreamTest
*/
class EventStream implements IteratorAggregate {
const FALSE = 1;
const TRUE = 0;
const BYTE = 2;
const SHORT = 3;
const INTEGER = 4;
const LONG = 5;
const BYTES = 6;
const STRING = 7;
const TIMESTAMP = 8;
const UUID = 9;

private $in;
public $marshalling;

/**
* Creates a new instance
*
* @param io.streams.InputStream $in
* @param ?util.data.Marshalliung $marshalling
*/
public function __construct(InputStream $in, $marshalling= null) {
$this->in= $in;
$this->marshalling= $marshalling;
}

/**
* Reads a given number of bytes
*
* @param int $length
* @return string
*/
private function read($length) {
$chunk= '';
do {
$chunk.= $this->in->read($length - strlen($chunk));
} while (strlen($chunk) < $length && $this->in->available());
return $chunk;
}

/**
* Parse headers from a given buffer
*
* @param string $buffer
* @return [:var] $headers
*/
private function headers($buffer) {
$headers= [];
$offset= 0;
$length= strlen($buffer);
while ($offset < $length) {
$l= ord($buffer[$offset++]);
$header= substr($buffer, $offset, $l);
$offset+= $l;

$t= ord($buffer[$offset++] ?? "\xff");
switch ($t) {
case self::TRUE:
$value= true;
break;

case self::FALSE:
$value= false;
break;

case self::BYTE:
$value= ord($buffer[$offset++]);
break;

case self::SHORT:
$value= unpack('n', substr($buffer, $offset, 2))[1];
$offset+= 2;
break;

case self::INTEGER:
$value= unpack('N', substr($buffer, $offset, 4))[1];
$offset+= 4;
break;

case self::LONG:
$value= unpack('J', substr($buffer, $offset, 8))[1];
$offset+= 8;
break;

case self::BYTES:
$l= unpack('n', substr($buffer, $offset, 2))[1];
$offset+= 2;
$value= new Bytes(substr($buffer, $offset, $l));
$offset+= $l;
break;

case self::STRING:
$l= unpack('n', substr($buffer, $offset, 2))[1];
$offset+= 2;
$value= substr($buffer, $offset, $l);
$offset+= $l;
break;

case self::TIMESTAMP:
$t= unpack('J', substr($buffer, $offset, 8))[1];
$value= new Date((int)($t / 1000));
$offset+= 8;
break;

case self::UUID:
$value= new UUID(new Bytes(substr($buffer, $offset, 16)));
$offset+= 16;
break;

default: throw new IllegalStateException('Unhandled type #'.$t);
}

$headers[$header]= $value;
}

return $headers;
}

/**
* Returns next event in stream or `null` if there is none left
*
* @return ?com.amazon.aws.api.Event
* @throws lang.IllegalStateException for checksum mismatches
*/
public function next() {
if (!$this->in->available()) return null;

$hash= hash_init('crc32b');
$buffer= $this->read(12);
hash_update($hash, $buffer);

$prelude= unpack('Ntotal/Nheaders/Nchecksum', $buffer);
if (sprintf('%u', crc32(substr($buffer, 0, 8))) !== (string)$prelude['checksum']) {
throw new IllegalStateException('Prelude checksum mismatch');
}

$buffer= $this->read($prelude['headers']);
$headers= $this->headers($buffer);
hash_update($hash, $buffer);

$buffer= $this->read($prelude['total'] - $prelude['headers'] - 16);
hash_update($hash, $buffer);

$checksum= unpack('N', $this->read(4))[1];
if (hexdec(hash_final($hash)) !== $checksum) {
throw new IllegalStateException('Payload checksum mismatch');
}

return new Event($this, $headers, $buffer);
}

/**
* Streams `com.amazon.aws.api.Event` instances
*
* @throws lang.IllegalStateException for checksum mismatches
*/
public function getIterator(): Traversable {
while (null !== ($next= $this->next())) {
yield $next;
}
}
}
14 changes: 14 additions & 0 deletions src/main/php/com/amazon/aws/api/Response.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ public function result($type= null) {
));
}

/**
* Returns an event stream if the content type is `application/vnd.amazon.eventstream`.
*
* @return ?com.amazon.aws.api.EventStream
* @throws lang.IllegalStateException
*/
public function events() {
$mime= $this->headers['Content-Type'][0] ?? null;
return 'application/vnd.amazon.eventstream' === $mime
? new EventStream($this->stream)
: null
;
}

/**
* Returns error, raising an error for non-error status codes or
* if the returned content type is unknown.
Expand Down
Loading

0 comments on commit c005462

Please sign in to comment.