Util

class asyncpraw.models.util.BoundedSet(max_items: int)

A set with a maximum size that evicts the oldest items when necessary.

This class does not implement the complete set interface.

__contains__(item: Any) bool

Test if the BoundedSet contains item.

__init__(max_items: int)

Initialize a BoundedSet instance.

add(item: Any)

Add an item to the set discarding the oldest item if necessary.

class asyncpraw.models.util.ExponentialCounter(max_counter: int)

A class to provide an exponential counter with jitter.

__init__(max_counter: int)

Initialize an ExponentialCounter instance.

Parameters:

max_counter

The maximum base value.

Note

The computed value may be 3.125% higher due to jitter.

counter() int | float

Increment the counter and return the current value with jitter.

reset()

Reset the counter to 1.

asyncpraw.models.util.permissions_string(*, known_permissions: Set[str], permissions: List[str] | None) str

Return a comma separated string of permission changes.

Parameters:
  • known_permissions – A set of strings representing the available permissions.

  • permissions – A list of strings, or None. These strings can exclusively contain + or - prefixes, or contain no prefixes at all. When prefixed, the resulting string will simply be the joining of these inputs. When not prefixed, all permissions are considered to be additions, and all permissions in the known_permissions set that aren’t provided are considered to be removals. When None, the result is "+all".

async for ... in asyncpraw.models.util.stream_generator(function: Callable, *, attribute_name: str = 'fullname', exclude_before: bool = False, pause_after: int | None = None, skip_existing: bool = False, **function_kwargs: Any) AsyncGenerator[Any, None]

Yield new items from function as they become available.

Parameters:
  • function – A callable that returns a ListingGenerator, e.g., Subreddit.comments() or Subreddit.new().

  • attribute_name – The field to use as an ID (default: "fullname").

  • exclude_before – When True does not pass params to function (default: False).

  • pause_after – An integer representing the number of requests that result in no new items before this function yields None, effectively introducing a pause into the stream. A negative value yields None after items from a single response have been yielded, regardless of number of new items obtained in that response. A value of 0 yields None after every response resulting in no new items, and a value of None never introduces a pause (default: None).

  • skip_existing – When True, this does not yield any results from the first request thereby skipping any items that existed in the stream prior to starting the stream (default: False).

Additional keyword arguments will be passed to function.

Note

This function internally uses an exponential delay with jitter between subsequent responses that contain no new results, up to a maximum delay of just over 16 seconds. In practice, that means that the time before pause for pause_after=N+1 is approximately twice the time before pause for pause_after=N.

For example, to create a stream of comment replies, try:

reply_function = reddit.inbox.comment_replies
async for reply in asyncpraw.models.util.stream_generator(reply_function):
    print(reply)

To pause a comment stream after six responses with no new comments, try:

subreddit = await reddit.subreddit("test")
async for comment in subreddit.stream.comments(pause_after=6):
    if comment is None:
        break
    print(comment)

To resume fetching comments after a pause, try:

subreddit = await reddit.subreddit("test")
comment_stream = subreddit.stream.comments(pause_after=5)

async for comment in comment_stream:
    if comment is None:
        break
    print(comment)
# Do any other processing, then try to fetch more data
async for comment in comment_stream:
    if comment is None:
        break
    print(comment)

To bypass the internal exponential backoff, try the following. This approach is useful if you are monitoring a subreddit with infrequent activity, and you want to consistently learn about new items from the stream as soon as possible, rather than up to a delay of just over sixteen seconds.

subreddit = await reddit.subreddit("test")
async for comment in subreddit.stream.comments(pause_after=0):
    if comment is None:
        continue
    print(comment)