Skip to content

hikari.api.event_manager#

Core interface for components that manage events in the library.

EventManager #

Bases: ABC

Base interface for event manager implementations.

This is a listener of a hikari.events.base_events.Event object and consumer of raw event payloads, and is expected to invoke one or more corresponding event listeners where appropriate.

consume_raw_event abstractmethod #

consume_raw_event(
    event_name: str, shard: GatewayShard, payload: JSONObject
) -> None

Consume a raw event.

PARAMETER DESCRIPTION
event_name

The case-insensitive name of the event being triggered.

TYPE: str

shard

Object of the shard that received this event.

TYPE: GatewayShard

payload

Payload of the event being triggered.

TYPE: JSONObject

RAISES DESCRIPTION
LookupError

If there is no consumer for the event.

dispatch abstractmethod #

dispatch(event: Event) -> Future[Any]

Dispatch an event.

PARAMETER DESCRIPTION
event

The event to dispatch.

TYPE: Event

Examples:

We can dispatch custom events by first defining a class that derives from hikari.events.base_events.Event.

    import attrs

    from hikari.traits import RESTAware
    from hikari.events.base_events import Event
    from hikari.users import User
    from hikari.snowflakes import Snowflake

    @attrs.define()
    class EveryoneMentionedEvent(Event):
        app: RESTAware = attrs.field()

        author: User = attrs.field()
        '''The user who mentioned everyone.'''

        content: str = attrs.field()
        '''The message that was sent.'''

        message_id: Snowflake = attrs.field()
        '''The message ID.'''

        channel_id: Snowflake = attrs.field()
        '''The channel ID.'''

We can then dispatch our event as we see fit.

    from hikari.events.messages import MessageCreateEvent

    @bot.listen(MessageCreateEvent)
    async def on_message(event):
        if "@everyone" in event.content or "@here" in event.content:
            event = EveryoneMentionedEvent(
                author=event.author,
                content=event.content,
                message_id=event.id,
                channel_id=event.channel_id,
            )

            bot.dispatch(event)

This event can be listened to elsewhere by subscribing to it with hikari.api.event_manager.EventManager.subscribe.

    @bot.listen(EveryoneMentionedEvent)
    async def on_everyone_mentioned(event):
        print(event.user, "just pinged everyone in", event.channel_id)
RETURNS DESCRIPTION
Future[Any]

A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later.

See Also

Listen : hikari.api.event_manager.EventManager.listen. Stream : hikari.api.event_manager.EventManager.stream. Subscribe : hikari.api.event_manager.EventManager.subscribe. Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe. Wait_for: hikari.api.event_manager.EventManager.wait_for.

get_listeners abstractmethod #

get_listeners(
    event_type: Type[EventT], /, *, polymorphic: bool = True
) -> Collection[CallbackT[EventT]]

Get the listeners for a given event type, if there are any.

PARAMETER DESCRIPTION
event_type

The event type to look for. T must be a subclass of hikari.events.base_events.Event.

TYPE: Type[T]

polymorphic

If True, this will also return the listeners for all the event types event_type will dispatch. If False, then only listeners for this class specifically are returned. The default is True.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Collection[Callable[[T], Coroutine[Any, Any, None]]]

A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered.

listen abstractmethod #

listen(
    *event_types: Type[EventT],
) -> Callable[[CallbackT[EventT]], CallbackT[EventT]]

Generate a decorator to subscribe a callback to an event type.

This is a second-order decorator.

PARAMETER DESCRIPTION
*event_types

The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature. T must be a subclass of hikari.events.base_events.Event.

TYPE: Optional[Type[T]] DEFAULT: ()

RETURNS DESCRIPTION
Callable[[T], T]

A decorator for a coroutine function that passes it to hikari.api.event_manager.EventManager.subscribe before returning the function reference.

See Also

Dispatch : hikari.api.event_manager.EventManager.dispatch. Stream : hikari.api.event_manager.EventManager.stream. Subscribe : hikari.api.event_manager.EventManager.subscribe. Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe. Wait_for : hikari.api.event_manager.EventManager.wait_for.

stream abstractmethod #

stream(
    event_type: Type[EventT],
    /,
    timeout: Union[float, int, None],
    limit: Optional[int] = None,
) -> EventStream[EventT]

Return a stream iterator for the given event and sub-events.

Warning

If you use [await stream.open()][] to start the stream then you must also close it with [await stream.close()][] otherwise it may queue events in memory indefinitely.

PARAMETER DESCRIPTION
event_type

The event type to listen for. This will listen for subclasses of this type additionally.

TYPE: Type[Event]

timeout

How long this streamer should wait for the next event before ending the iteration. If None then this will continue until explicitly broken from.

TYPE: Optional[int, float]

limit

The limit for how many events this should queue at one time before dropping extra incoming events, leave this as None for the cache size to be unlimited.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
EventStream[Event]

The async iterator to handle streamed events. This must be started with with stream: or stream.open before asynchronously iterating over it.

Examples:

    with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
        async for user_id in stream.map("user_id").limit(50):
            ...

or using open() and close()

    stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
    stream.open()

    async for user_id in stream.map("user_id").limit(50)
        ...

    stream.close()
See Also

Dispatch : hikari.api.event_manager.EventManager.dispatch. Listen : hikari.api.event_manager.EventManager.listen. Subscribe : hikari.api.event_manager.EventManager.subscribe. Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe. Wait_for : hikari.api.event_manager.EventManager.wait_for.

subscribe abstractmethod #

subscribe(event_type: Type[Any], callback: CallbackT[Any]) -> None

Subscribe a given callback to a given event type.

PARAMETER DESCRIPTION
event_type

The event type to listen for. This will also listen for any subclasses of the given type. T must be a subclass of hikari.events.base_events.Event.

TYPE: Type[T]

callback

Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.

TYPE: CallbackT[Any]

Examples:

The following demonstrates subscribing a callback to message creation events.

    from hikari.events.messages import MessageCreateEvent

    async def on_message(event):
        ...

    bot.subscribe(MessageCreateEvent, on_message)
See Also

Dispatch : hikari.api.event_manager.EventManager.dispatch. Listen : hikari.api.event_manager.EventManager.listen. Stream : hikari.api.event_manager.EventManager.stream. Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe. Wait_for : hikari.api.event_manager.EventManager.wait_for.

unsubscribe abstractmethod #

unsubscribe(event_type: Type[Any], callback: CallbackT[Any]) -> None

Unsubscribe a given callback from a given event type, if present.

PARAMETER DESCRIPTION
event_type

The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly. T must derive from hikari.events.base_events.Event.

TYPE: Type[T]

callback

The callback to unsubscribe.

TYPE: CallbackT[Any]

Examples:

The following demonstrates unsubscribing a callback from a message creation event.

    from hikari.events.messages import MessageCreateEvent

    async def on_message(event):
        ...

    bot.unsubscribe(MessageCreateEvent, on_message)
See Also

Dispatch : hikari.api.event_manager.EventManager.dispatch. Listen : hikari.api.event_manager.EventManager.listen. Stream : hikari.api.event_manager.EventManager.stream. Subscribe : hikari.api.event_manager.EventManager.subscribe. Wait_for : hikari.api.event_manager.EventManager.wait_for.

wait_for abstractmethod async #

wait_for(
    event_type: Type[EventT],
    /,
    timeout: Union[float, int, None],
    predicate: Optional[PredicateT[EventT]] = None,
) -> EventT

Wait for a given event to occur once, then return the event.

Warning

Async predicates are not supported.

PARAMETER DESCRIPTION
event_type

The event type to listen for. This will listen for subclasses of this type additionally.

TYPE: Type[Event]

predicate

A function taking the event as the single parameter. This should return True if the event is one you want to return, or False if the event should not be returned. If left as None (the default), then the first matching event type that the bot receives (or any subtype) will be the one returned.

TYPE: Optional[PredicateT[EventT]] DEFAULT: None

timeout

The amount of time to wait before raising an asyncio.TimeoutError and giving up instead. This is measured in seconds. If None, then no timeout will be waited for (no timeout can result in "leaking" of coroutines that never complete if called in an uncontrolled way, so is not recommended).

TYPE: Union[float, int, None]

RETURNS DESCRIPTION
Event

The event that was provided.

RAISES DESCRIPTION
TimeoutError

If the timeout is not None and is reached before an event is received that the predicate returns True for.

See Also

Dispatch : hikari.api.event_manager.EventManager.dispatch. Listen : hikari.api.event_manager.EventManager.listen. Stream : hikari.api.event_manager.EventManager.stream. Subscribe : hikari.api.event_manager.EventManager.subscribe. Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe.

EventStream #

Bases: LazyIterator[EventT], ABC

A base abstract class for all event streamers.

Unlike hikari.iterators.LazyIterator (which this extends), an event streamer must be started and closed.

Examples:

A streamer may either be started and closed using with syntax where hikari.api.event_manager.EventStream.open and hikari.api.event_manager.EventStream.close are implicitly called based on context.

    with EventStream(app, EventType, timeout=50) as stream:
        async for entry in stream:
            ...

A streamer may also be directly started and closed using the hikari.api.event_manager.EventStream.close and hikari.api.event_manager.EventStream.open. Note that if you don't call hikari.api.event_manager.EventStream.close after opening a streamer when you're finished with it then it may queue events in memory indefinitely.

    stream = EventStream(app, EventType, timeout=50)
    await stream.open()
    async for event in stream:
        ...

    await stream.close()
See Also

LazyIterator : hikari.iterators.LazyIterator.

awaiting #

awaiting(window_size: int = 10) -> LazyIterator[ValueT]

Await each item concurrently in a fixed size window.

Warning

Setting a large window size, or setting it to 0 to await everything is a dangerous thing to do if you are making API calls. Some endpoints will get ratelimited and cause a backup of waiting tasks, others may begin to spam global rate limits instead (the fetch_user endpoint seems to be notorious for doing this).

Note

This call assumes that the iterator contains awaitable values as input. MyPy cannot detect this nicely, so any cast is forced internally. If the item is not awaitable, you will receive a TypeError instead. You have been warned. You cannot escape the ways of the duck type young grasshopper.

PARAMETER DESCRIPTION
window_size

The window size of how many tasks to await at once. You can set this to 0 to await everything at once, but see the below warning.

TYPE: int DEFAULT: 10

RETURNS DESCRIPTION
LazyIterator[ValueT]

The new lazy iterator to return.

chunk #

chunk(chunk_size: int) -> LazyIterator[Sequence[ValueT]]

Return results in chunks of up to chunk_size amount of entries.

PARAMETER DESCRIPTION
chunk_size

The limit for how many results should be returned in each chunk.

TYPE: int

RETURNS DESCRIPTION
LazyIterator[Sequence[ValueT]]

hikari.iterators.LazyIterator that emits each chunked sequence.

close abstractmethod #

close() -> None

Mark this streamer as closed to stop it from queueing and receiving events.

If called on an already closed streamer then this will do nothing.

Note

[with streamer][] may be used as a short-cut for opening and closing a streamer.

collect async #

collect(
    collector: Callable[[Sequence[ValueT]], Collection[ValueT]]
) -> Collection[ValueT]

Collect the results into a given type and return it.

PARAMETER DESCRIPTION
collector

A function that consumes a sequence of values and returns a collection.

TYPE: Callable[[Sequence[ValueT]], Collection[ValueT]]

count async #

count() -> int

Count the number of results.

RETURNS DESCRIPTION
int

Number of results found.

enumerate #

enumerate(*, start: int = 0) -> LazyIterator[Tuple[int, ValueT]]

Enumerate the paginated results lazily.

This behaves as an asyncio-friendly version of enumerate which uses much less memory than collecting all the results first and calling enumerate across them.

PARAMETER DESCRIPTION
start

Optional int to start at. If omitted, this is 0.

TYPE: int DEFAULT: 0

Examples:

    >>> async for i, item in paginated_results.enumerate():
    ...    print(i, item)
    (0, foo)
    (1, bar)
    (2, baz)
    (3, bork)
    (4, qux)

    >>> async for i, item in paginated_results.enumerate(start=9):
    ...    print(i, item)
    (9, foo)
    (10, bar)
    (11, baz)
    (12, bork)
    (13, qux)

    >>> async for i, item in paginated_results.enumerate(start=9).limit(3):
    ...    print(i, item)
    (9, foo)
    (10, bar)
    (11, baz)
RETURNS DESCRIPTION
LazyIterator[Tuple[int, T]]

A paginated results view that asynchronously yields an increasing counter in a tuple with each result, lazily.

filter abstractmethod #

filter(
    *predicates: Union[Tuple[str, Any], Callable[[EventT], bool]], **attrs: Any
) -> Self

Filter the items by one or more conditions.

Each condition is treated as a predicate, being called with each item that this iterator would return when it is requested.

All conditions must evaluate to True for the item to be returned. If this is not met, then the item is discarded and ignored, the next matching item will be returned instead, if there is one.

PARAMETER DESCRIPTION
*predicates

Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.filter(("user.bot", True)).

TYPE: Union[Callable[[ValueT], bool], Tuple[str, Any]] DEFAULT: ()

**attrs

Alternative to passing 2-tuples. Cannot specify nested attributes using this method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
EventStream[ValueT]

The current stream with the new filter applied.

flat_map #

flat_map(
    flattener: _FlattenerT[ValueT, AnotherValueT]
) -> LazyIterator[AnotherValueT]

Perform a flat mapping operation.

This will pass each item in the iterator to the given function parameter, expecting a new typing.Iterable or typing.AsyncIterator to be returned as the result. This means you can map to a new hikari.iterators.LazyIterator, typing.AsyncIterator, typing.Iterable, async generator, or generator.

Remember that typing.Iterator implicitly provides typing.Iterable compatibility.

This is used to provide lazy conversions, and can be used to implement reactive-like pipelines if desired.

All results are combined into one large lazy iterator and yielded lazily.

PARAMETER DESCRIPTION
flattener

A function that returns either an async iterator or iterator of new values. Could be an attribute name instead.

TYPE: _FlattenerT[ValueT, AnotherValueT]

Examples:

The following example generates a distinct collection of all mentioned users in the given channel from the past 500 messages.

    def iter_mentioned_users(message: hikari.Message) -> typing.Iterable[Snowflake]:
        for match in re.findall(r"<@!?(\d+)>", message.content):
            yield Snowflake(match)

    mentioned_users = await (
        channel
        .history()
        .limit(500)
        .map(".content")
        .flat_map(iter_mentioned_users)
        .distinct()
    )
RETURNS DESCRIPTION
LazyIterator[AnotherValueT]

The new lazy iterator to return.

flatten #

Flatten an async iterator of iterables.

for_each async #

for_each(consumer: Callable[[ValueT], Any]) -> None

Forward each value to a given consumer immediately.

last async #

last() -> ValueT

Return the last element of this iterator only.

Note

This method will consume the whole iterator if run.

RETURNS DESCRIPTION
ValueT

The last result.

RAISES DESCRIPTION
LookupError

If no result exists.

limit #

limit(limit: int) -> LazyIterator[ValueT]

Limit the number of items you receive from this async iterator.

PARAMETER DESCRIPTION
limit

The number of items to get. This must be greater than zero.

TYPE: int

Examples:

    >>> async for item in paginated_results.limit(3):
    ...     print(item)
RETURNS DESCRIPTION
LazyIterator[ValueT]

A paginated results view that asynchronously yields a maximum of the given number of items before completing.

map #

map(
    transformation: Union[Callable[[ValueT], AnotherValueT], str]
) -> LazyIterator[AnotherValueT]

Map the values to a different value.

PARAMETER DESCRIPTION
transformation

The function to use to map the attribute. This may alternatively be a string attribute name to replace the input value with. You can provide nested attributes using the . operator.

TYPE: Union[Callable[[ValueT], bool], str]

RETURNS DESCRIPTION
LazyIterator[AnotherValueT]

hikari.iterators.LazyIterator that maps each value to another value.

next async #

next() -> ValueT

Return the next element of this iterator only.

RETURNS DESCRIPTION
ValueT

The next result.

RAISES DESCRIPTION
LookupError

If no more results exist.

open abstractmethod #

open() -> None

Mark this streamer as opened to let it start receiving and queueing events.

If called on an already started streamer then this will do nothing.

Note

[with streamer][] may be used as a short-cut for opening and closing a stream.

reversed #

reversed() -> LazyIterator[ValueT]

Return a lazy iterator of the remainder of this iterator's values reversed.

RETURNS DESCRIPTION
LazyIterator[ValueT]

The lazy iterator of this iterator's remaining values reversed.

skip #

skip(number: int) -> LazyIterator[ValueT]

Drop the given number of items, then yield anything after.

PARAMETER DESCRIPTION
number

The max number of items to drop before any items are yielded.

TYPE: int

RETURNS DESCRIPTION
LazyIterator[ValueT]

A paginated results view that asynchronously yields all items AFTER the given number of items are discarded first.

skip_until #

skip_until(
    *predicates: Union[Tuple[str, Any], Callable[[ValueT], bool]], **attrs: Any
) -> LazyIterator[ValueT]

Discard items while all conditions are False.

Items after this will be yielded as normal.

PARAMETER DESCRIPTION
*predicates

Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.skip_until(("user.bot", True)).

TYPE: Union[Callable[[ValueT], bool], Tuple[str, Any]] DEFAULT: ()

**attrs

Alternative to passing 2-tuples. Cannot specify nested attributes using this method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
LazyIterator[ValueT]

LazyIterator that only emits values once a condition has failed. All items before this are discarded.

skip_while #

skip_while(
    *predicates: Union[Tuple[str, Any], Callable[[ValueT], bool]], **attrs: Any
) -> LazyIterator[ValueT]

Discard items while all conditions are True.

Items after this will be yielded as normal.

PARAMETER DESCRIPTION
*predicates

Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.skip_while(("user.bot", True)).

TYPE: Union[Callable[[ValueT], bool], Tuple[str, Any]] DEFAULT: ()

**attrs

Alternative to passing 2-tuples. Cannot specify nested attributes using this method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
LazyIterator[ValueT]

LazyIterator that only emits values once a condition has been met. All items before this are discarded.

sort async #

sort(*, key: Any = None, reverse: bool = False) -> Sequence[ValueT]

Collect all results, then sort the collection before returning it.

take_until #

take_until(
    *predicates: Union[Tuple[str, Any], Callable[[ValueT], bool]], **attrs: Any
) -> LazyIterator[ValueT]

Return each item until any conditions pass or the end is reached.

PARAMETER DESCRIPTION
*predicates

Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.take_until(("user.bot", True)).

TYPE: Union[Callable[[ValueT], bool], Tuple[str, Any]] DEFAULT: ()

**attrs

Alternative to passing 2-tuples. Cannot specify nested attributes using this method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
LazyIterator[ValueT]

LazyIterator that only emits values until any conditions are matched.

take_while #

take_while(
    *predicates: Union[Tuple[str, Any], Callable[[ValueT], bool]], **attrs: Any
) -> LazyIterator[ValueT]

Return each item until any conditions fail or the end is reached.

PARAMETER DESCRIPTION
*predicates

Predicates to invoke. These are functions that take a value and return True if it is of interest, or False otherwise. These may instead include 2-tuple objects consisting of a str attribute name (nested attributes are referred to using the . operator), and values to compare for equality. This allows you to specify conditions such as members.take_while(("user.bot", True)).

TYPE: Union[Callable[[ValueT], bool], Tuple[str, Any]] DEFAULT: ()

**attrs

Alternative to passing 2-tuples. Cannot specify nested attributes using this method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
LazyIterator[ValueT]

LazyIterator that only emits values until any conditions are not matched.