Skip to content

Commit

Permalink
feat: add RMQ channels options, support for prefix for routing_key, a… (
Browse files Browse the repository at this point in the history
#1448)

* feat: add RMQ channels options, support for prefix for routing_key, add public API for middlewares

* tests: fix asyncapi tests

* chore: update dependencies

* fix: parse old NATS stream config if it exists

* feat (#1447): add StreamMessage.batch_headers attr to provide access to whole batch messages headers

* fix: add factory is_flag option

* feat: add batch_headers for Confluent
  • Loading branch information
Lancetnik authored May 16, 2024
1 parent d100d5f commit 1bcbcf5
Show file tree
Hide file tree
Showing 31 changed files with 589 additions and 96 deletions.
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.5"
__version__ = "0.5.6"

SERVICE_NAME = f"faststream-{__version__}"

Expand Down
13 changes: 13 additions & 0 deletions faststream/broker/core/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ def __init__(
self._parser = parser
self._decoder = decoder

def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
"""Append BrokerMiddleware to the end of middlewares list.
Current middleware will be used as a most inner of already existed ones.
"""
self._middlewares = (*self._middlewares, middleware)

for sub in self._subscribers.values():
sub.add_middleware(middleware)

for pub in self._publishers.values():
pub.add_middleware(middleware)

@abstractmethod
def subscriber(
self,
Expand Down
2 changes: 2 additions & 0 deletions faststream/broker/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
TYPE_CHECKING,
Any,
Generic,
List,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -38,6 +39,7 @@ class StreamMessage(Generic[MsgType]):

body: Union[bytes, Any]
headers: "AnyDict" = field(default_factory=dict)
batch_headers: List["AnyDict"] = field(default_factory=list)
path: "AnyDict" = field(default_factory=dict)

content_type: Optional[str] = None
Expand Down
3 changes: 3 additions & 0 deletions faststream/broker/publisher/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class PublisherProto(
_middlewares: Iterable["PublisherMiddleware"]
_producer: Optional["ProducerProto"]

@abstractmethod
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None: ...

@staticmethod
@abstractmethod
def create() -> "PublisherProto[MsgType]":
Expand Down
10 changes: 9 additions & 1 deletion faststream/broker/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
from faststream.asyncapi.message import get_response_schema
from faststream.asyncapi.utils import to_camelcase
from faststream.broker.publisher.proto import PublisherProto
from faststream.broker.types import MsgType, P_HandlerParams, T_HandlerReturn
from faststream.broker.types import (
BrokerMiddleware,
MsgType,
P_HandlerParams,
T_HandlerReturn,
)
from faststream.broker.wrapper.call import HandlerCallWrapper

if TYPE_CHECKING:
Expand Down Expand Up @@ -87,6 +92,9 @@ def __init__(
self.include_in_schema = include_in_schema
self.schema_ = schema_

def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
self._broker_middlewares = (*self._broker_middlewares, middleware)

@override
def setup( # type: ignore[override]
self,
Expand Down
3 changes: 3 additions & 0 deletions faststream/broker/subscriber/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class SubscriberProto(
_broker_middlewares: Iterable["BrokerMiddleware[MsgType]"]
_producer: Optional["ProducerProto"]

@abstractmethod
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None: ...

@staticmethod
@abstractmethod
def create() -> "SubscriberProto[MsgType]":
Expand Down
3 changes: 3 additions & 0 deletions faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ def __init__(
self.description_ = description_
self.include_in_schema = include_in_schema

def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
self._broker_middlewares = (*self._broker_middlewares, middleware)

@override
def setup( # type: ignore[override]
self,
Expand Down
6 changes: 3 additions & 3 deletions faststream/cli/docs/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ def serve(
),
),
is_factory: bool = typer.Option(
False,
"--factory", help="Treat APP as an application factory"
False, "--factory", help="Treat APP as an application factory"
),
) -> None:
"""Serve project AsyncAPI schema."""
Expand Down Expand Up @@ -110,7 +109,8 @@ def gen(
),
is_factory: bool = typer.Option(
False,
"--factory", help="Treat APP as an application factory"
"--factory",
help="Treat APP as an application factory",
),
) -> None:
"""Generate project AsyncAPI schema."""
Expand Down
4 changes: 3 additions & 1 deletion faststream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ def publish(
rpc: bool = typer.Option(False, help="Enable RPC mode and system output"),
is_factory: bool = typer.Option(
False,
"--factory", help="Treat APP as an application factory"
"--factory",
is_flag=True,
help="Treat APP as an application factory",
),
) -> None:
"""Publish a message using the specified broker in a FastStream application.
Expand Down
41 changes: 22 additions & 19 deletions faststream/confluent/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple, Union

from faststream.broker.message import decode_message, gen_cor_id
from faststream.confluent.message import FAKE_CONSUMER, KafkaMessage
Expand All @@ -20,18 +20,14 @@ async def parse_message(
message: "Message",
) -> "StreamMessage[Message]":
"""Parses a Kafka message."""
headers = {}
if message.headers() is not None:
for i, j in message.headers(): # type: ignore[union-attr]
if isinstance(j, str):
headers[i] = j
else:
headers[i] = j.decode()
headers = _parse_msg_headers(message.headers())

body = message.value()
offset = message.offset()
_, timestamp = message.timestamp()

handler: Optional["LogicSubscriber[Any]"] = context.get_local("handler_")

return KafkaMessage(
body=body,
headers=headers,
Expand All @@ -49,28 +45,29 @@ async def parse_message_batch(
message: Tuple["Message", ...],
) -> "StreamMessage[Tuple[Message, ...]]":
"""Parses a batch of messages from a Kafka consumer."""
body: List[Any] = []
batch_headers: List[Dict[str, str]] = []

first = message[0]
last = message[-1]

headers = {}
if first.headers() is not None:
for i, j in first.headers(): # type: ignore[union-attr]
if isinstance(j, str):
headers[i] = j
else:
headers[i] = j.decode()
body = [m.value() for m in message]
first_offset = first.offset()
last_offset = last.offset()
for m in message:
body.append(m.value)
batch_headers.append(_parse_msg_headers(m.headers()))

headers = next(iter(batch_headers), {})

_, first_timestamp = first.timestamp()

handler: Optional["LogicSubscriber[Any]"] = context.get_local("handler_")

return KafkaMessage(
body=body,
headers=headers,
batch_headers=batch_headers,
reply_to=headers.get("reply_to", ""),
content_type=headers.get("content-type"),
message_id=f"{first_offset}-{last_offset}-{first_timestamp}",
message_id=f"{first.offset()}-{last.offset()}-{first_timestamp}",
correlation_id=headers.get("correlation_id", gen_cor_id()),
raw_message=message,
consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
Expand All @@ -91,3 +88,9 @@ async def decode_message_batch(
) -> "DecodedMessage":
"""Decode a batch of messages."""
return [decode_message(await cls.parse_message(m)) for m in msg.raw_message]


def _parse_msg_headers(
headers: Sequence[Tuple[str, Union[bytes, str]]],
) -> Dict[str, str]:
return {i: j if isinstance(j, str) else j.decode() for i, j in headers}
17 changes: 14 additions & 3 deletions faststream/kafka/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

from faststream.broker.message import decode_message, gen_cor_id
from faststream.kafka.message import FAKE_CONSUMER, KafkaMessage
Expand Down Expand Up @@ -39,13 +39,24 @@ async def parse_message_batch(
message: Tuple["ConsumerRecord", ...],
) -> "StreamMessage[Tuple[ConsumerRecord, ...]]":
"""Parses a batch of messages from a Kafka consumer."""
body: List[Any] = []
batch_headers: List[Dict[str, str]] = []

first = message[0]
last = message[-1]
headers = {i: j.decode() for i, j in first.headers}

for m in message:
body.append(m.value)
batch_headers.append({i: j.decode() for i, j in m.headers})

headers = next(iter(batch_headers), {})

handler: Optional["LogicSubscriber[Any]"] = context.get_local("handler_")

return KafkaMessage(
body=[m.value for m in message],
body=body,
headers=headers,
batch_headers=batch_headers,
reply_to=headers.get("reply_to", ""),
content_type=headers.get("content-type"),
message_id=f"{first.offset}-{last.offset}-{first.timestamp}",
Expand Down
4 changes: 2 additions & 2 deletions faststream/nats/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,12 +623,12 @@ async def start(self) -> None:
)

except BadRequestError as e:
old_config = (await self.stream.stream_info(stream.name)).config

if (
e.description
== "stream name already in use with a different configuration"
):
old_config = (await self.stream.stream_info(stream.name)).config

self._log(str(e), logging.WARNING, log_context)
await self.stream.update_stream(
config=stream.config,
Expand Down
20 changes: 16 additions & 4 deletions faststream/nats/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional

from faststream.broker.message import StreamMessage, decode_message, gen_cor_id
from faststream.nats.message import NatsBatchMessage, NatsMessage
Expand Down Expand Up @@ -102,15 +102,27 @@ async def parse_batch(
self,
message: List["Msg"],
) -> "StreamMessage[List[Msg]]":
if first_msg := next(iter(message), None):
path = self.get_path(first_msg.subject)
body: List[bytes] = []
batch_headers: List[Dict[str, str]] = []

if message:
path = self.get_path(message[0].subject)

for m in message:
batch_headers.append(m.headers or {})
body.append(m.data)

else:
path = None

headers = next(iter(batch_headers), {})

return NatsBatchMessage(
raw_message=message,
body=[m.data for m in message],
body=body,
path=path or {},
headers=headers,
batch_headers=batch_headers,
)

async def decode_batch(
Expand Down
1 change: 1 addition & 0 deletions faststream/rabbit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
"ReplyConfig",
"RabbitExchange",
"RabbitQueue",
# Annotations
"RabbitMessage",
)
13 changes: 13 additions & 0 deletions faststream/rabbit/annotations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from aio_pika import RobustChannel, RobustConnection
from typing_extensions import Annotated

from faststream.annotations import ContextRepo, Logger, NoCast
Expand All @@ -13,8 +14,20 @@
"RabbitMessage",
"RabbitBroker",
"RabbitProducer",
"Channel",
"Connection",
)

RabbitMessage = Annotated[RM, Context("message")]
RabbitBroker = Annotated[RB, Context("broker")]
RabbitProducer = Annotated[AioPikaFastProducer, Context("broker._producer")]

Channel = Annotated[RobustChannel, Context("broker._channel")]
Connection = Annotated[RobustConnection, Context("broker._connection")]

# NOTE: transaction is not for the public usage yet
# async def _get_transaction(connection: Connection) -> RabbitTransaction:
# async with connection.channel(publisher_confirms=False) as channel:
# yield channel.transaction()

# Transaction = Annotated[RabbitTransaction, Depends(_get_transaction)]
Loading

0 comments on commit 1bcbcf5

Please sign in to comment.