qq.ext.tasks – asyncio.Task 助手

1.1.0 新版功能.

制作机器人时最常见的操作之一是以指定的时间间隔在后台运行循环。 这种模式很常见,但有很多你需要注意的事情:

  • 我如何处理 asyncio.CancelledError

  • 如果互联网不可用,我该怎么办?

  • 无论如何,我可以断线的最大秒数是多少?

这个 qq.py 扩展的目标是让你摆脱所有这些担忧。

教程

一个简单的后台任务 Cog :

from qq.ext import tasks, commands

class MyCog(commands.Cog):
    def __init__(self):
        self.index = 0
        self.printer.start()

    def cog_unload(self):
        self.printer.cancel()

    @tasks.loop(seconds=5.0)
    async def printer(self):
        print(self.index)
        self.index += 1

在重新连接期间添加要处理的异常:

import asyncpg
from qq.ext import tasks, commands

class MyCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.data = []
        self.batch_update.add_exception_type(asyncpg.PostgresConnectionError)
        self.batch_update.start()

    def cog_unload(self):
        self.batch_update.cancel()

    @tasks.loop(minutes=5.0)
    async def batch_update(self):
        async with self.bot.pool.acquire() as con:
            # batch update here...
            pass

退出前循环一定次数:

from qq.ext import tasks

@tasks.loop(seconds=5.0, count=5)
async def slow_count():
    print(slow_count.current_loop)

@slow_count.after_loop
async def after_slow_count():
    print('done!')

slow_count.start()

在循环开始之前等待机器人准备就绪:

from qq.ext import tasks, commands

class MyCog(commands.Cog):
    def __init__(self, bot):
        self.index = 0
        self.bot = bot
        self.printer.start()

    def cog_unload(self):
        self.printer.cancel()

    @tasks.loop(seconds=5.0)
    async def printer(self):
        print(self.index)
        self.index += 1

    @printer.before_loop
    async def before_printer(self):
        print('waiting...')
        await self.bot.wait_until_ready()

在取消期间做某事:

from qq.ext import tasks, commands
import asyncio

class MyCog(commands.Cog):
    def __init__(self, bot):
        self.bot= bot
        self._batch = []
        self.lock = asyncio.Lock()
        self.bulker.start()

    async def do_bulk(self):
        # bulk insert data here
        ...

    @tasks.loop(seconds=10.0)
    async def bulker(self):
        async with self.lock:
            await self.do_bulk()

    @bulker.after_loop
    async def on_bulker_cancel(self):
        if self.bulker.is_being_cancelled() and len(self._batch) != 0:
            # if we're cancelled and we have some data left...
            # let's insert it to our database
            await self.do_bulk()

API 参考

class qq.ext.tasks.Loop

抽象循环和重新连接逻辑的后台任务助手。创建它的主要方法是通过 loop()

@after_loop

一个装饰器,注册一个在循环完成运行后要调用的协程。协程必须不带任何参数(类上下文中的 self 除外)。

注解

即使在取消期间也会调用此协程。如果需要区分某事是否被取消,请检查 is_being_cancelled() 是否为 True

参数

coro (coroutine) – 循环完成后要注册的协程。

引发

TypeError – 该函数不是协程。

@before_loop

在循环开始运行之前注册要调用的协程的装饰器。如果你想在循环开始之前等待一些机器人状态,这很有用,例如 qq.Client.wait_until_ready()

协程必须不带任何参数(类上下文中的 self 除外)。

参数

coro (coroutine) – 在循环运行之前注册的协程。

引发

TypeError – 该函数不是协程。

@error

A decorator that registers a coroutine to be called if the task encounters an unhandled exception. The coroutine must take only one argument the exception raised (except self in a class context). By default this prints to sys.stderr however it could be overridden to have a different implementation. .. versionadded:: 1.4 :param coro: The coroutine to register in the event of an unhandled exception. :type coro: coroutine

引发

TypeError – The function was not a coroutine.

property seconds: Optional[float]

每次迭代之间的秒数的只读值。如果传递了明确的 time 值,则为 None

Type

Optional[float]

property minutes: Optional[float]

每次迭代之间的分钟数的只读值。如果传递了明确的 time 值,则为 None

Type

Optional[float]

property hours: Optional[float]

每次迭代之间的小时数的只读值。如果传递了明确的 time 值,则为 None

Type

Optional[float]

property time: Optional[List[datetime.time]]

此循环运行的确切时间的只读列表。如果通过了相对时间,则为 None

Type

Optional[List[datetime.time]]

property current_loop: int

循环的当前迭代。

Type

int

property next_iteration: Optional[datetime.datetime]

当循环的下一次迭代将发生时。

Type

Optional[datetime.datetime]

await __call__(*args, **kwargs)

这个函数是一个 coroutine。 调用任务持有的内部回调。 .. versionadded:: 1.1.0

参数
  • *args – 要使用的参数。

  • **kwargs – 要使用的关键字参数。

start(*args, **kwargs)

在事件循环中启动内部任务。

参数
  • *args – 要使用的参数。

  • **kwargs – 要使用的关键字参数。

引发

RuntimeError – 任务已启动并正在运行。

返回

已创建的任务。

返回类型

asyncio.Task

stop()

优雅地停止任务运行。与 cancel()不同,这允许任务在正常退出之前完成其当前迭代。

注解

如果内部函数引发了一个可以在完成之前处理的错误,那么它将重试直到成功。 如果这是不可取的,要么在通过 clear_exception_types() 停止之前删除错误处理,要么使用 :meth:`cancel 代替。

cancel()

取消内部任务,如果它正在运行。

restart(*args, **kwargs)

一种重新启动内部任务的便捷方法。

注解

由于这个函数的工作方式,任务不会像 start() 那样返回。

参数
  • *args – 要使用的参数。

  • **kwargs – 要使用的关键字参数。

add_exception_type(*exceptions)

添加要在重新连接逻辑期间处理的异常类型。 默认情况下,处理的异常类型是由 qq.Client.connect() 处理的异常类型,其中包括很多断网错误。 如果你正在与引发自己的一组异常的第 3 方库进行交互,则此函数很有用。

参数

*exceptions (Type[BaseException]) – 要处理的异常类的参数列表。

引发

TypeError – 传递的异常要么不是类,要么不是从 BaseException 继承的。

clear_exception_types()

删除所有处理的异常类型。

注解

这个操作显然是无法撤消的!

remove_exception_type(*exceptions)

删除在重新连接逻辑期间处理的异常类型。

参数

*exceptions (Type[BaseException]) – 要处理的异常类的参数列表。

返回

是否已成功删除所有异常。

返回类型

bool

get_task()

Optional[asyncio.Task]: 如果没有运行,则获取内部任务或 None

is_being_cancelled()

任务是否被取消。

failed()

bool: 内部任务是否失败。

is_running()

bool: 检查任务当前是否正在运行。

change_interval(*, seconds=0, minutes=0, hours=0, time=...)

更改时间的间隔。

参数
  • seconds (float) – 每次迭代之间的秒数。

  • minutes (float) – 每次迭代之间的分钟数。

  • hours (float) – 每次迭代之间的小时数。

  • time (Union[datetime.time, Sequence[datetime.time]]) –

    运行此循环的确切时间。应该传递非空列表或 datetime.time 的单个值。这不能与相对时间参数一起使用。

    注解

    重复次数将被忽略,并且只运行一次。

引发
  • ValueError – 给出了无效值。

  • TypeError – 传递了 time 参数的无效值,或者 time 参数与相对时间参数一起传递。

qq.ext.tasks.loop(*, seconds=..., minutes=..., hours=..., time=..., count=None, reconnect=True, loop=...)

使用可选的重新连接逻辑在后台为你安排任务的装饰器。装饰器返回一个 Loop

参数
  • seconds (float) – 每次迭代之间的秒数。

  • minutes (float) – 每次迭代之间的分钟数。

  • hours (float) – 每次迭代之间的小时数。

  • time (Union[datetime.time, Sequence[datetime.time]]) –

    运行此循环的确切时间。应该传递一个非空列表或一个 datetime.time 的值。支持时区。 如果没有给出时间的时区,则假定它代表 UTC 时间。这不能与相对时间参数结合使用。

    注解

    重复时间将被忽略,并且只运行一次。

  • count (Optional[int]) – 要执行的循环次数,如果它应该是无限循环,则为“无”。

  • reconnect (bool) – 是否使用类似于 qq.Client.connect 中使用的指数退避算法处理错误并重新启动任务。

  • loop (asyncio.AbstractEventLoop) – 用于注册任务的循环,如果没有给出,则默认为 asyncio.get_event_loop()

引发
  • ValueError – 给出了无效值。

  • TypeError – 该函数不是协程,传递的 time 参数的值无效,或者 time 参数与相对时间参数一起传递。