Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chains Streaming, fixes BT-10339 #1261

Merged
merged 11 commits into from
Dec 2, 2024
Merged

Chains Streaming, fixes BT-10339 #1261

merged 11 commits into from
Dec 2, 2024

Conversation

marius-baseten
Copy link
Contributor

@marius-baseten marius-baseten commented Nov 22, 2024

🚀 What

Support raw string/byte streaming and streaming of structured (pydantic) data with separate item and optional header/footer types.

  • New chains example (also used for testing).
  • Not supported (not recommended) for sync functions.
  • Remove CI check on doc sync (no useful).

💻 How

  • Adapt code gen and framework/validations to allow AsyncIterator[bytes|str] endpoints. Enforce that generator functions have this annotation and vice versa.
  • Rename internal terminology from generator to streaming.
  • Bugfix in truss schema to be invariant against Iterator/Generator annotations.
  • Fix docker internal URLs in DockerTrussService.
  • Add method to consume streaming responses in stub (with retries/error handling).
  • Helper classes StreamWriter and StreamReader, linked by StreamTypes to consistently deserialize and serialize structured data. Some fun typing to make this work variadically...

🔬 Testing

  • E2E testing for "happy" case both local and docker mode.
  • Unittesting for streaming.py.

@marius-baseten marius-baseten changed the title It works! Next step is clean up. Get types right. Maybe move some thi… Chains Streaming, related BT-10339 Nov 26, 2024
Copy link

linear bot commented Nov 26, 2024

@marius-baseten marius-baseten requested review from tyranitar and squidarth and removed request for tyranitar November 26, 2024 01:16
Copy link
Contributor

@tyranitar tyranitar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Just a few minor comments.

truss-chains/truss_chains/framework.py Outdated Show resolved Hide resolved
truss-chains/truss_chains/streaming.py Show resolved Hide resolved
Copy link
Collaborator

@squidarth squidarth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! One implementation question -- what's the story for how this could be consumed by the end user?

truss-chains/truss_chains/streaming.py Outdated Show resolved Hide resolved
if __name__ == "__main__":
with chains.run_local():
chain = Consumer()
result = asyncio.run(chain.run_remote())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the final chain streams output, what would be an easy way of consuming that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have 3 choices:

  • raw strings: works directly.
  • raw bytes: works directly.
  • structured/typed pydantic models: you create stream_reader with the same model definitions client side.

For the last one, we can discuss ways to distribute that implementation. It depends only on pydantic and builtins, so you wouldn't need to install the whole truss package, you just need that source file. Or we could even generate a "client".

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the last one, we can discuss ways to distribute that implementation. It depends only on pydantic and builtins, so you wouldn't need to install the whole truss package, you just need that source file. Or we could even generate a "client".

Sounds good

return struct.pack(">BI", delimiter.value, length)

def _serialize(self, obj: pydantic.BaseModel, delimiter: _Delimiter) -> bytes:
data_bytes = obj.model_dump_json().encode()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to be clear, if someone has binary data (say audio) that they are sending back, they don't have to use this right? They would just stream the bytes directly?

I'm not sure how a pydantic model would serialize a field that contained binary data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to return raw binary data, you don't need any of these streamers, you can directly stream the bytes.

@marius-baseten marius-baseten changed the title Chains Streaming, related BT-10339 Chains Streaming, fixes BT-10339 Dec 2, 2024
@marius-baseten marius-baseten merged commit 7472bb6 into main Dec 2, 2024
12 checks passed
@marius-baseten marius-baseten deleted the marius/stream-byte branch December 2, 2024 21:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants