Skip to content

Commit

Permalink
♻️ PEP 585 - Type Hinting Generics In Standard Collections (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
perdy committed Nov 24, 2024
1 parent 1fbe6da commit 429b4d2
Show file tree
Hide file tree
Showing 78 changed files with 998 additions and 1,025 deletions.
4 changes: 2 additions & 2 deletions examples/add_model_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@


class X(BaseModel):
input: typing.List[typing.Any] = Field(title="input", description="Model input")
input: list[typing.Any] = Field(title="input", description="Model input")


class Y(BaseModel):
output: typing.List[typing.Any] = Field(title="output", description="Model output")
output: list[typing.Any] = Field(title="output", description="Model output")


app.schema.register_schema("X", X)
Expand Down
4 changes: 1 addition & 3 deletions examples/data_schema.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import typing

from pydantic import BaseModel, validator

import flama
Expand Down Expand Up @@ -34,7 +32,7 @@ def home():
return {"hello": "world"}


def list_puppies(name: str = None) -> typing.List[Puppy]:
def list_puppies(name: str = None) -> list[Puppy]:
"""
tags:
- puppy
Expand Down
3 changes: 1 addition & 2 deletions examples/pagination.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import string
import typing

from pydantic import BaseModel, validator

Expand Down Expand Up @@ -66,7 +65,7 @@ def alphabet(**kwargs):

@app.route("/puppy/", methods=["GET"])
@app.paginator.page_number
def puppies(name: str = None, **kwargs) -> typing.List[Puppy]:
def puppies(name: str = None, **kwargs) -> list[Puppy]:
"""
tags:
- puppy
Expand Down
28 changes: 13 additions & 15 deletions flama/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ class Flama:
def __init__(
self,
routes: t.Optional[t.Sequence[t.Union["BaseRoute", "Mount"]]] = None,
components: t.Optional[t.Union[t.Sequence[injection.Component], t.Set[injection.Component]]] = None,
modules: t.Optional[t.Union[t.Sequence["Module"], t.Set["Module"]]] = None,
components: t.Optional[t.Union[t.Sequence[injection.Component], set[injection.Component]]] = None,
modules: t.Optional[t.Union[t.Sequence["Module"], set["Module"]]] = None,
middleware: t.Optional[t.Sequence["Middleware"]] = None,
debug: bool = False,
events: t.Optional[
t.Union[t.Dict[str, t.List[t.Callable[..., t.Coroutine[t.Any, t.Any, None]]]], Events]
] = None,
events: t.Optional[t.Union[dict[str, list[t.Callable[..., t.Coroutine[t.Any, t.Any, None]]]], Events]] = None,
lifespan: t.Optional[t.Callable[[t.Optional["Flama"]], t.AsyncContextManager]] = None,
title: str = "Flama",
version: str = "0.1.0",
Expand Down Expand Up @@ -185,7 +183,7 @@ def add_component(self, component: injection.Component):
self.router.build(self)

@property
def routes(self) -> t.List["BaseRoute"]:
def routes(self) -> list["BaseRoute"]:
"""List of registered routes.
:return: Routes.
Expand All @@ -196,12 +194,12 @@ def add_route(
self,
path: t.Optional[str] = None,
endpoint: t.Optional[types.HTTPHandler] = None,
methods: t.Optional[t.List[str]] = None,
methods: t.Optional[list[str]] = None,
name: t.Optional[str] = None,
include_in_schema: bool = True,
route: t.Optional["Route"] = None,
pagination: t.Optional[t.Union[str, "PaginationType"]] = None,
tags: t.Optional[t.Dict[str, t.Any]] = None,
tags: t.Optional[dict[str, t.Any]] = None,
) -> "Route":
"""Register a new HTTP route or endpoint under given path.
Expand Down Expand Up @@ -229,11 +227,11 @@ def add_route(
def route(
self,
path: str,
methods: t.Optional[t.List[str]] = None,
methods: t.Optional[list[str]] = None,
name: t.Optional[str] = None,
include_in_schema: bool = True,
pagination: t.Optional[t.Union[str, "PaginationType"]] = None,
tags: t.Optional[t.Dict[str, t.Any]] = None,
tags: t.Optional[dict[str, t.Any]] = None,
) -> t.Callable[[types.HTTPHandler], types.HTTPHandler]:
"""Decorator version for registering a new HTTP route in this router under given path.
Expand Down Expand Up @@ -262,7 +260,7 @@ def add_websocket_route(
name: t.Optional[str] = None,
route: t.Optional["WebSocketRoute"] = None,
pagination: t.Optional[t.Union[str, "PaginationType"]] = None,
tags: t.Optional[t.Dict[str, t.Any]] = None,
tags: t.Optional[dict[str, t.Any]] = None,
) -> "WebSocketRoute":
"""Register a new websocket route or endpoint under given path.
Expand All @@ -282,7 +280,7 @@ def websocket_route(
path: str,
name: t.Optional[str] = None,
pagination: t.Optional[t.Union[str, "PaginationType"]] = None,
tags: t.Optional[t.Dict[str, t.Any]] = None,
tags: t.Optional[dict[str, t.Any]] = None,
) -> t.Callable[[types.WebSocketHandler], types.WebSocketHandler]:
"""Decorator version for registering a new websocket route in this router under given path.
Expand All @@ -300,7 +298,7 @@ def mount(
app: t.Optional[types.App] = None,
name: t.Optional[str] = None,
mount: t.Optional["Mount"] = None,
tags: t.Optional[t.Dict[str, t.Any]] = None,
tags: t.Optional[dict[str, t.Any]] = None,
) -> "Mount":
"""Register a new mount point containing an ASGI app in this router under given path.
Expand Down Expand Up @@ -345,7 +343,7 @@ def decorator(func: t.Callable) -> t.Callable:

return decorator

def add_exception_handler(self, exc_class_or_status_code: t.Union[int, t.Type[Exception]], handler: t.Callable):
def add_exception_handler(self, exc_class_or_status_code: t.Union[int, type[Exception]], handler: t.Callable):
"""Add a new exception handler for given status code or exception class.
:param exc_class_or_status_code: Status code or exception class.
Expand All @@ -369,7 +367,7 @@ def resolve_url(self, name: str, **path_params: t.Any) -> url.URL:
"""
return self.router.resolve_url(name, **path_params)

def resolve_route(self, scope: types.Scope) -> t.Tuple[BaseRoute, types.Scope]:
def resolve_route(self, scope: types.Scope) -> tuple[BaseRoute, types.Scope]:
"""Look for a route that matches given ASGI scope.
:param scope: ASGI scope.
Expand Down
2 changes: 1 addition & 1 deletion flama/authentication/jwt/claims.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
class ClaimValidator(abc.ABC):
claim: t.ClassVar[str]

def __init__(self, payload: "Payload", claims: t.Dict[str, t.Any]) -> None:
def __init__(self, payload: "Payload", claims: dict[str, t.Any]) -> None:
self.value = claims.get(self.claim)
self.payload = payload

Expand Down
6 changes: 3 additions & 3 deletions flama/authentication/jwt/jws.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class JWS:
}

@classmethod
def _get_algorithm(cls, header: t.Dict[str, t.Any]) -> "SignAlgorithm":
def _get_algorithm(cls, header: dict[str, t.Any]) -> "SignAlgorithm":
"""Get the algorithm to sign the token.
It gets the algorithm from the header, and it returns the corresponding algorithm implementation.
Expand All @@ -46,7 +46,7 @@ def _get_algorithm(cls, header: t.Dict[str, t.Any]) -> "SignAlgorithm":
return cls.ALGORITHMS[header["alg"]]

@classmethod
def encode(cls, header: t.Dict[str, t.Any], payload: t.Dict[str, t.Any], key: bytes) -> bytes:
def encode(cls, header: dict[str, t.Any], payload: dict[str, t.Any], key: bytes) -> bytes:
"""Encode a JWS token.
It generates a signed token using the given key. The result is a JWT token with a format of:
Expand All @@ -67,7 +67,7 @@ def encode(cls, header: t.Dict[str, t.Any], payload: t.Dict[str, t.Any], key: by
return b".".join([header_segment, payload_segment, signature])

@classmethod
def decode(cls, token: bytes, key: bytes) -> t.Tuple[t.Dict[str, t.Any], t.Dict[str, t.Any], bytes]:
def decode(cls, token: bytes, key: bytes) -> tuple[dict[str, t.Any], dict[str, t.Any], bytes]:
"""Decode a JWS token.
It decode and validate the signature of the token. The token format must be: <header>.<payload>.<signature>
Expand Down
14 changes: 7 additions & 7 deletions flama/authentication/jwt/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Header:
alg: t.Optional[str] = None
cty: t.Optional[str] = None

def asdict(self) -> t.Dict[str, t.Any]:
def asdict(self) -> dict[str, t.Any]:
"""Return the header as a dictionary.
The fields are sorted alphabetically and the None values are removed.
Expand All @@ -62,7 +62,7 @@ class Payload:
standard and it is not validated when the token is decoded.
"""

data: t.Dict[str, t.Any]
data: dict[str, t.Any]
iss: t.Optional[str] = None
sub: t.Optional[str] = None
aud: t.Optional[str] = None
Expand All @@ -73,7 +73,7 @@ class Payload:

def __init__(
self,
data: t.Optional[t.Dict[str, t.Any]] = None,
data: t.Optional[dict[str, t.Any]] = None,
iss: t.Optional[str] = None,
sub: t.Optional[str] = None,
aud: t.Optional[str] = None,
Expand Down Expand Up @@ -113,7 +113,7 @@ def __init__(
object.__setattr__(self, "jti", jti)
object.__setattr__(self, "data", {**(data or {}), **kwargs})

def asdict(self) -> t.Dict[str, t.Any]:
def asdict(self) -> dict[str, t.Any]:
"""Return the payload as a dictionary.
The fields are sorted alphabetically and the None values are removed.
Expand All @@ -138,7 +138,7 @@ class JWT:
header: Header
payload: Payload

def __init__(self, header: t.Dict[str, t.Any], payload: t.Dict[str, t.Any]) -> None:
def __init__(self, header: dict[str, t.Any], payload: dict[str, t.Any]) -> None:
object.__setattr__(self, "header", Header(**header))
object.__setattr__(self, "payload", Payload(**payload))

Expand Down Expand Up @@ -188,7 +188,7 @@ def decode(cls, token: bytes, key: bytes) -> "JWT":

return decoded_token

def validate(self, validators: t.Optional[t.List[claims.ClaimValidator]] = None, **claims: t.Any) -> None:
def validate(self, validators: t.Optional[list[claims.ClaimValidator]] = None, **claims: t.Any) -> None:
"""Validate the token claims.
It validates all the default claims in the payload in the following order:
Expand Down Expand Up @@ -220,7 +220,7 @@ def validate(self, validators: t.Optional[t.List[claims.ClaimValidator]] = None,
if invalid_claims:
raise exceptions.JWTValidateException(f"Invalid claims ({', '.join(invalid_claims)})")

def asdict(self) -> t.Dict[str, t.Any]:
def asdict(self) -> dict[str, t.Any]:
"""Return the JWT as a dictionary.
:return: JWT as a dictionary.
Expand Down
2 changes: 1 addition & 1 deletion flama/authentication/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def __call__(self, scope: "types.Scope", receive: "types.Receive", send: "

await response(scope, receive, send)

def _get_permissions(self, route: "BaseRoute") -> t.Set[str]:
def _get_permissions(self, route: "BaseRoute") -> set[str]:
return set(route.tags.get("permissions", []))

async def _get_response(self, scope: "types.Scope", receive: "types.Receive") -> t.Union["Response", "Flama"]:
Expand Down
2 changes: 1 addition & 1 deletion flama/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
concurrency: t.Union[Concurrency, str],
func: t.Callable[P, t.Union[None, t.Awaitable[None]]],
*args: P.args,
**kwargs: P.kwargs
**kwargs: P.kwargs,
) -> None:
self.func = task_wrapper(func)
self.args = args
Expand Down
2 changes: 1 addition & 1 deletion flama/cli/commands/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def command(flama_config: str, create_config: str):
fs.write(ExampleConfig.build(mode=create_config).dumps())
return

with open(flama_config, "r") as fs:
with open(flama_config) as fs:
config = Config.load(fs) # type: ignore[arg-type]

config.run()
Expand Down
6 changes: 3 additions & 3 deletions flama/cli/config/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def context(self) -> t.Generator[_AppContext, None, None]:
...

@classmethod
def build(cls, app: t.Union[str, t.Dict[str, t.Any], "Flama"]) -> "App":
def build(cls, app: t.Union[str, dict[str, t.Any], "Flama"]) -> "App":
if isinstance(app, str):
return StrApp(app)

Expand Down Expand Up @@ -110,12 +110,12 @@ class DictApp(App):
description: str = "Fire up with the flame"
schema: str = "/schema/"
docs: str = "/docs/"
models: t.List[Model] = dataclasses.field(
models: list[Model] = dataclasses.field(
default_factory=lambda: [Model(url="/model-url/", path="model-path.flm", name="model-name")]
)

@classmethod
def from_dict(cls, data: t.Dict[str, t.Any]) -> "App":
def from_dict(cls, data: dict[str, t.Any]) -> "App":
if "models" in data:
data["models"] = [Model(**model) for model in data.pop("models")]
return cls(**data) # type: ignore[arg-type]
Expand Down
6 changes: 3 additions & 3 deletions flama/cli/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ class Config:
server: Uvicorn = dataclasses.field(default_factory=Uvicorn)

@classmethod
def from_dict(cls, data: t.Dict[str, t.Any]) -> "Config":
def from_dict(cls, data: dict[str, t.Any]) -> "Config":
return cls(
**{**data, "app": App.build(data["app"]), "server": Uvicorn(**data["server"])} # type: ignore[arg-type]
)

def to_dict(self) -> t.Dict[str, t.Any]:
def to_dict(self) -> dict[str, t.Any]:
return dataclasses.asdict(self)

@classmethod
Expand All @@ -42,7 +42,7 @@ def dump(self, fs: io.StringIO) -> None:
fs.write(self.dumps())

@classmethod
def dump_example(cls, type: str) -> t.Dict[str, t.Any]:
def dump_example(cls, type: str) -> dict[str, t.Any]:
result = cls().to_dict()
if type == "simple":
result["server"] = {k: v for k, v in result["server"] if k in ("host", "port")}
Expand Down
Loading

0 comments on commit 429b4d2

Please sign in to comment.