Source code for mastodon.async_refresh

# async_refresh.py - async refresh endpoints and utilities

import time
import copy

from mastodon.utility import api_version
from mastodon.errors import MastodonIllegalArgumentError, MastodonAPIError
from mastodon.internals import Mastodon as Internals
from mastodon.return_types import AsyncRefresh
from mastodon.types_base import AttribAccessDict, IdType, try_cast_recurse
from typing import Optional, Union, Tuple

class Mastodon(Internals):
    ###
    # Reading data: Async refreshes
    ###
[docs] def get_async_refresh_info(self, result) -> Optional[Tuple[AsyncRefresh, int]]: """ Extract async refresh information from an API result, if present. Returns a tuple of (:class:`AsyncRefresh`, retry_seconds) where the entity contains the ``id``, ``status`` (always ``"running"``), and optionally ``result_count``, and retry_seconds is the server-suggested polling interval in seconds. Returns None if the result has no async refresh information. """ raw = getattr(result, '_async_refresh', None) if raw is not None: entity = try_cast_recurse(AsyncRefresh, { 'id': raw['id'], 'status': 'running', 'result_count': raw.get('result_count'), }) return (entity, raw.get('retry', 3)) return None
[docs] @api_version("4.4.0", "4.4.0") def get_async_refresh_status(self, result_or_id: Union[IdType, AsyncRefresh, AttribAccessDict]) -> AsyncRefresh: """ Get the status of an async refresh by its ID. The ID can be obtained from a previous API response that included the ``Mastodon-Async-Refresh`` header, accessible via :meth:`get_async_refresh_info`. You can pass in an async refresh ID, an :class:`AsyncRefresh` entity (e.g. from a previous call to this function or from :meth:`get_async_refresh_info`), or an API result that has async refresh information (i.e. a previous API result that had the header set). Returns an :class:`AsyncRefresh` dict. """ async_refresh_id = self.__get_async_refresh_id(result_or_id) response = self.__api_request('GET', f'/api/v1_alpha/async_refreshes/{async_refresh_id}', override_type=dict) if isinstance(response, dict) and 'async_refresh' in response: response = response['async_refresh'] return try_cast_recurse(AsyncRefresh, response)
[docs] @api_version("4.4.0", "4.4.0") def await_async_refresh(self, result, timeout: float = 0.0, max_attempts: int = -1) -> Optional[AttribAccessDict]: """ Wait for an async refresh to finish, then re-fetch and return the original resource. Polls the async refresh endpoint with backoff as indicated by the server's ``retry`` hint. Once the refresh is ``finished``, re-issues the original API request and returns the refreshed result entity. `result` should be a previous API result that has async refresh information (i.e. the server returned a ``Mastodon-Async-Refresh`` header with that response). If no such information is present, or the refresh info indicates the refresh is already finished, this function will return the original result as is immediately. `timeout` is the maximum total time in seconds to wait for the async refresh to complete. Set to 0 for no timeout. Default is 0 (wait until done). `max_attempts` is the maximum number of polling attempts. Set to 0 or negative for no limit. Default is -1 (wait until done). Returns the re-fetched original entity on success, or None if the timeout or max attempts was exceeded before the refresh finished. Raises `MastodonIllegalArgumentError` if the passed object has async refresh information but is missing the original request information needed to re-fetch. Raises `MastodonAPIError` if any of the API requests made during the process fail with an error response. """ async_refresh_info = getattr(result, '_async_refresh', None) if async_refresh_info is None: if not isinstance(result, (AttribAccessDict, list)): raise MastodonIllegalArgumentError("await_async_refresh expects an API result entity.") # no async refresh info -> just return right away return result else: # if we do have it, make sure it has the original request information we need to re-fetch later if '_method' not in async_refresh_info: raise MastodonIllegalArgumentError("The provided result's async refresh information is missing the original request information.") # already done -> just return right away if async_refresh_info.get('status') == 'finished': return result async_refresh_id = async_refresh_info['id'] retry_seconds = async_refresh_info.get('retry', 3) start_time = time.monotonic() attempts = 0 refresh_result = None while attempts < max_attempts or max_attempts <= 0: if timeout > 0 and (time.monotonic() - start_time) >= timeout: break if attempts > 0: wait = min(retry_seconds, timeout - (time.monotonic() - start_time)) if timeout > 0 else retry_seconds # Make sure wait is maximum 5 minutes to avoid hangs in case server is being silly wait = min(wait, 300) if wait > 0: time.sleep(wait) refresh_result = self.get_async_refresh_status(async_refresh_id) attempts += 1 if refresh_result.status == 'finished': # Re-fetch the original endpoint method = async_refresh_info['_method'] endpoint = async_refresh_info['_endpoint'] params = copy.deepcopy(async_refresh_info.get('_params', {})) response_type = async_refresh_info.get('_mastopy_type', None) return self.__api_request(method, endpoint, params, override_type=response_type) # Use retry hint from the polled response's header if available polled_raw = getattr(refresh_result, '_async_refresh', None) if polled_raw is not None: retry_seconds = polled_raw.get('retry', retry_seconds) return None
def __get_async_refresh_id(self, result_or_id): """ Internal helper: extract async refresh ID from an ID value or a result object. """ if isinstance(result_or_id, (str, int)): return result_or_id if isinstance(result_or_id, AsyncRefresh): return result_or_id['id'] raw = getattr(result_or_id, '_async_refresh', None) if raw is not None: return raw['id'] raise MastodonIllegalArgumentError("Pass either an async refresh ID, an AsyncRefresh entity, or an API result with async refresh information.")