Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions celery-stubs/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
from celery import local
from celery._state import current_app, current_task
from types import ModuleType
from typing import Any, NamedTuple

from celery import execute as execute
from celery import local as local
from celery import messaging as messaging
from celery._state import current_app as current_app
from celery._state import current_task as current_task
from celery.app import bugreport as bugreport
from celery.app import shared_task
from celery.app.base import Celery
from celery.app.task import Task
Expand All @@ -13,22 +20,73 @@ from celery.canvas import (
xmap,
xstarmap,
)
from celery.canvas import maybe_signature as maybe_signature
from celery.canvas import signature as subtask
from celery.utils import uuid

log: ModuleType
registry: ModuleType

class version_info_t(NamedTuple):
major: int
minor: int
micro: int
releaselevel: str
serial: str

VERSION: version_info_t
version_info: version_info_t
VERSION_BANNER: str
SERIES: str
__version__: str
__author__: str
__contact__: str
__homepage__: str
__docformat__: str

def _find_option_with_arg(
argv: list[str], short_opts: str | None = ..., long_opts: list[str] | None = ...
) -> str | None: ...
def maybe_patch_concurrency(
argv: list[str] | None = ...,
short_opts: str | None = ...,
long_opts: list[str] | None = ...,
patches: dict[str, Any] | None = ...,
) -> None: ...

__all__ = (
"SERIES",
"VERSION",
"VERSION_BANNER",
"Celery",
"Signature",
"Task",
"__author__",
"__contact__",
"__docformat__",
"__homepage__",
"__version__",
"_find_option_with_arg",
"bugreport",
"chain",
"chord",
"chunks",
"current_app",
"current_task",
"execute",
"group",
"local",
"log",
"maybe_patch_concurrency",
"maybe_signature",
"messaging",
"registry",
"shared_task",
"signature",
"subtask",
"uuid",
"version_info",
"version_info_t",
"xmap",
"xstarmap",
)
5 changes: 5 additions & 0 deletions celery-stubs/__main__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from typing import Any

__all__ = ("main",)

def main() -> Any: ...
17 changes: 17 additions & 0 deletions celery-stubs/_state.pyi
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
from collections.abc import Callable
from typing import Any

from celery.app.base import Celery
from celery.app.task import Task

__all__ = (
"connect_on_app_finalize",
"current_app",
"current_task",
"get_current_app",
"get_current_task",
"get_current_worker_task",
"set_default_app",
)

current_app: Celery
current_task: Task[Any, Any]

def get_current_task() -> Task[Any, Any]: ...
def get_current_app() -> Celery: ...
def get_current_worker_task() -> Task[Any, Any] | None: ...
def set_default_app(app: Celery) -> None: ...
def connect_on_app_finalize(
callback: Callable[[Celery], Any],
) -> Callable[[Celery], Any]: ...
43 changes: 42 additions & 1 deletion celery-stubs/app/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,55 @@ from typing import (
overload,
)

from celery.app import beat as beat
from celery.app import control as control
from celery.app import events as events
from celery.app import task as task
from celery.app.base import Celery
from celery.app.task import Context, Task
from celery.utils.threads import _LocalStack
from typing_extensions import ParamSpec

__all__ = (
"AppPickler",
"Celery",
"app_or_default",
"bugreport",
"default_app",
"disable_trace",
"enable_trace",
"pop_current_task",
"push_current_task",
"shared_task",
)

class AppPickler:
def __call__(self, cls: type[Celery], *args: Any) -> Celery: ...
def build_kwargs(self, *args: Any) -> dict[str, Any]: ...
def build_standard_kwargs(
self,
main: str | None,
changes: dict[str, Any] | None,
loader: Any,
backend: Any,
amqp: Any,
events: Any, # noqa: F811
log: Any,
control: Any, # noqa: F811
accept_magic_kwargs: bool,
config_source: Any | None = None,
) -> dict[str, Any]: ...
def construct(self, cls: type[Celery], **kwargs: Any) -> Celery: ...
def prepare(self, app: Celery, **kwargs: Any) -> dict[str, Any]: ...

def app_or_default(app: Celery | None = ...) -> Celery: ...
def bugreport(app: Celery | None = ...) -> str: ...
def enable_trace() -> None: ...
def disable_trace() -> None: ...
def push_current_task(obj: Task[Any, Any]) -> None: ...
def pop_current_task() -> Task[Any, Any] | None: ...

default_app: Celery

_T = TypeVar("_T", bound=Task[Any, Any])
_P = ParamSpec("_P")
_R = TypeVar("_R")
Expand Down
159 changes: 158 additions & 1 deletion celery-stubs/app/amqp.pyi
Original file line number Diff line number Diff line change
@@ -1 +1,158 @@
class AMQP: ...
from datetime import datetime
from typing import Any, NamedTuple, TypeAlias

import kombu
import kombu.pools
from celery.app.base import Celery
from celery.app.routes import Router as RouterClass
from kombu.transport.base import StdChannel

__all__ = ("AMQP", "Queues", "task_message")

class task_message(NamedTuple):
headers: dict[str, Any]
properties: dict[str, Any]
body: tuple[Any, ...]
sent_event: dict[str, Any] | None

class Queues(dict[str, kombu.Queue]):
def __init__(
self,
queues: list[kombu.Queue] | dict[str, kombu.Queue] | None = None,
default_exchange: kombu.Exchange | None = None,
create_missing: bool = True,
create_missing_queue_type: str | None = None,
create_missing_queue_exchange_type: str | None = None,
autoexchange: kombu.Exchange | None = None,
max_priority: int | None = None,
default_routing_key: str | None = None,
) -> None: ...
def __missing__(self, name: str) -> kombu.Queue: ...
def add(self, queue: kombu.Queue, **kwargs: Any) -> None: ...
def add_compat(self, name: str, **options: Any) -> kombu.Queue: ...
@property
def consume_from(self) -> dict[str, kombu.Queue]: ...
def deselect(self, exclude: list[str]) -> None: ...
def format(self, indent: int = 0, indent_first: bool = True) -> str: ...
def new_missing(self, name: str) -> kombu.Queue: ...
def select(self, include: list[str]) -> None: ...
def select_add(self, queue: kombu.Queue, **kwargs: Any) -> None: ...

# Type alias to avoid conflict with AMQP.Queues method
_Queues: TypeAlias = Queues

class AMQP:
# Class attributes
BrokerConnection: type[kombu.Connection]
Connection: type[kombu.Connection]
Consumer: type[kombu.Consumer]
Producer: type[kombu.Producer]
queues_cls: type[_Queues]
argsrepr_maxsize: int
kwargsrepr_maxsize: int
autoexchange: kombu.Exchange | None

app: Celery

def __init__(self, app: Celery) -> None: ...
def TaskConsumer(
self,
channel: StdChannel,
queues: list[kombu.Queue] | None = None,
accept: list[str] | None = None,
**kw: Any,
) -> kombu.Consumer: ...
def Queues(
self,
queues: list[kombu.Queue] | dict[str, kombu.Queue],
create_missing: bool | None = None,
create_missing_queue_type: str | None = None,
create_missing_queue_exchange_type: str | None = None,
autoexchange: kombu.Exchange | None = None,
max_priority: int | None = None,
) -> _Queues: ...
def Router(
self,
queues: _Queues | None = None,
create_missing: bool | None = None,
) -> RouterClass: ...
def flush_routes(self) -> None: ...
def as_task_v1(
self,
task_id: str,
name: str,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
countdown: float | None = None,
eta: datetime | None = None,
group_id: str | None = None,
group_index: int | None = None,
expires: float | datetime | None = None,
retries: int = 0,
chord: Any | None = None,
callbacks: list[Any] | None = None,
errbacks: list[Any] | None = None,
reply_to: str | None = None,
time_limit: int | None = None,
soft_time_limit: int | None = None,
create_sent_event: bool = False,
root_id: str | None = None,
parent_id: str | None = None,
shadow: str | None = None,
now: datetime | None = None,
timezone: Any | None = None,
**compat_kwargs: Any,
) -> task_message: ...
def as_task_v2(
self,
task_id: str,
name: str,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
countdown: float | None = None,
eta: datetime | None = None,
group_id: str | None = None,
group_index: int | None = None,
expires: float | datetime | None = None,
retries: int = 0,
chord: Any | None = None,
callbacks: list[Any] | None = None,
errbacks: list[Any] | None = None,
reply_to: str | None = None,
time_limit: int | None = None,
soft_time_limit: int | None = None,
create_sent_event: bool = False,
root_id: str | None = None,
parent_id: str | None = None,
shadow: str | None = None,
chain: Any | None = None,
now: datetime | None = None,
timezone: Any | None = None,
origin: str | None = None,
ignore_result: bool = False,
argsrepr: str | None = None,
kwargsrepr: str | None = None,
stamped_headers: list[str] | None = None,
replaced_task_nesting: int = 0,
**options: Any,
) -> task_message: ...
@property
def create_task_message(self) -> Any: ...
@property
def default_exchange(self) -> kombu.Exchange: ...
@property
def default_queue(self) -> kombu.Queue: ...
@property
def producer_pool(self) -> kombu.pools.ProducerPool: ...
@property
def publisher_pool(self) -> kombu.pools.ProducerPool: ...
@property
def queues(self) -> _Queues: ...
@property
def router(self) -> RouterClass: ...
@property
def routes(self) -> list[dict[str, Any]]: ...
@property
def send_task_message(self) -> Any: ...
@property
def utc(self) -> bool: ...
11 changes: 11 additions & 0 deletions celery-stubs/app/annotations.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Any

__all__ = ("MapAnnotation", "prepare", "resolve_all")

class MapAnnotation:
def __init__(self, d: dict[str, Any]) -> None: ...
def annotate(self, task: Any) -> dict[str, Any] | None: ...
def annotate_any(self) -> dict[str, Any] | None: ...

def prepare(annotations: Any) -> Any: ...
def resolve_all(anno: Any, task: Any) -> dict[str, Any]: ...
4 changes: 4 additions & 0 deletions celery-stubs/app/autoretry.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from collections.abc import Callable
from typing import Any

def add_autoretry_behaviour(task: Any, **options: Any) -> Callable[..., Any]: ...
15 changes: 15 additions & 0 deletions celery-stubs/app/backends.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Any

from celery.backends.base import Backend

__all__ = ("by_name", "by_url")

BACKEND_ALIASES: dict[str, str]
UNKNOWN_BACKEND: str

def by_name(
backend: str | None = None,
loader: Any | None = None,
extension_namespace: str = "celery.result_backends",
) -> type[Backend]: ...
def by_url(backend: str | None = None, loader: Any | None = None) -> type[Backend]: ...
Loading
Loading