Usage with FastStream¶
that-depends is out of the box compatible with faststream.Depends():
from typing import Annotated
from faststream import Depends
from faststream.asgi import AsgiFastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker()
app = AsgiFastStream(broker)
@broker.subscriber(queue="queue")
async def process(
text: str,
suffix: Annotated[
str, Depends(Container.suffix_factory) # (1)!
],
) -> None:
return text + suffix
- This would be the same as
Provide[Container.suffix_factory]
Context Middleware¶
If you are using ContextResource provider, you likely will want to
initialize a context before processing message with faststream.
that-depends provides integration for these use cases:
Then you can use the DIContextMiddleware with your broker:
from that_depends.integrations.faststream import DIContextMiddleware
from that_depends import ContextScopes
from faststream.rabbit import RabbitBroker
broker = RabbitBroker(middlewares=[DIContextMiddleware(Container, scope=ContextScopes.REQUEST)])
Example¶
Here is an example that includes life-cycle events:
import datetime
import contextlib
import typing
from faststream import FastStream, Depends, Logger
from faststream.rabbit import RabbitBroker
from tests import container
@contextlib.asynccontextmanager
async def lifespan_manager() -> typing.AsyncIterator[None]:
try:
yield
finally:
await container.DIContainer.tear_down()
broker = RabbitBroker()
app = FastStream(broker, lifespan=lifespan_manager)
@broker.subscriber("in")
async def read_root(
logger: Logger,
some_dependency: typing.Annotated[
container.DependentFactory,
Depends(container.DIContainer.dependent_factory)
],
) -> datetime.datetime:
startup_time = some_dependency.async_resource
logger.info(startup_time)
return startup_time
@app.after_startup
async def t() -> None:
await broker.publish(None, "in")