API Reference

This API reference within the AQIS documentation describes the AQIS components (cf. Component Overview) in detail.

Default Connections

The DEFAULT_CONNECTIONS dictionary is a key part of AQIS that defines the default connection settings for the Dask cluster. This dictionary specifies connection details like the host, port, and extra configurations that define the Dask cluster’s resource usage, such as number of workers, threads per worker, memory limits, and more. This setup ensures AQIS can interact with a Dask cluster in a flexible manner, either locally or remotely.

Note: The default connection provided in DEFAULT_CONNECTIONS is a local Dask cluster configuration. If the execution mode is set to remote, users can provide their own Dask connection details.

Imports:

from aqis import DEFAULT_CONNECTIONS

Dictionary Definition:

DEFAULT_CONNECTIONS[...]
Key / Parameter Description
'dask_default' The key in the dictionary and also the connection ID.
host='localhost' Dask scheduler is expected to run on the local machine.
port=41453 Port on which the Dask scheduler is listening.
conn_type='tcp' Connection type – here it’s TCP/IP (suitable for Dask).
extra={...} Additional parameters passed to configure the Dask cluster.

extra Fields in DEFAULT_CONNECTIONS

Field Purpose
n_workers=4 Number of Dask worker processes to spin up.
threads_per_worker=1 Each worker will use a single thread (helps with isolation and debugging).
memory_limit='1GiB' Each worker is restricted to using 1 GiB of RAM.
processes Controls whether workers are spawned as processes (True) or threads (False).
This is set to False on macOS for compatibility, and True elsewhere by default.

Example Usage: Default Connection Setup

# Define the default set of connections used by AQIS.
# This dictionary includes a Dask connection configuration under the key 'dask_default'.
# The Dask client is expected to run locally (localhost) on port 41453 using TCP.
# Additional configuration is provided via the `extra` dictionary:
# - n_workers: Number of Dask worker processes to start.
# - threads_per_worker: Each worker will use 1 thread.
# - memory_limit: Each worker is limited to 1 GiB of memory.
# - processes: On macOS, use threads instead of processes for compatibility (False); otherwise, use processes (True).
DEFAULT_CONNECTIONS = {
    'dask_default': Connection(
        'dask_default',
        host='localhost',
        port=41453,
        conn_type='tcp',
        extra={
            'n_workers': 4,              # Number of workers
            'threads_per_worker': 1,     # Threads per worker
            'memory_limit': '1GiB',      # Memory limit per worker
            'processes': False if platform.system() == 'Darwin' else True  # Use threads on macOS
        }
    )
}

Execution Mode

The ExecutionMode class is an Enum that defines the two available modes of execution for AQIS. This abstraction allows developers to easily toggle between development and production/distributed environments without changing the logic of their operations.

  • LOCAL: Executes tasks on a local Dask cluster.
  • REMOTE: Executes tasks on a remote Dask cluster.

Imports:

from aqis import ExecutionMode

Class Definition:

class ExecutionMode(enum)

Enum Values:

Mode Value Description
LOCAL 'local' Use a local Dask cluster for execution
REMOTE 'remote' Use a remote Dask cluster for execution

Example Usage: Setting Execution Mode

from aqis import ExecutionMode

# Create AQIS instance with remote mode
aqis = AQIS(mode=ExecutionMode.REMOTE)

AQIS Dataframe

The AQISDataFrame class is an abstraction over Dask and Pandas DataFrames. It allows users to load data from various sources (such as CSV files or Pandas DataFrames) into Dask DataFrames, enabling distributed data processing.

Imports:

from aqis import AQISDataFrame

Constructor:

class AQISDataFrame():
    def __init__(self, 
        data=None, 
        npartitions=4, 
        connection=None, 
        client=None):
Method Parameters Description
__init__ data=None
npartitions=4
connection=None
client=None
Initializes the AQISDataFrame with either a Dask DataFrame, Pandas DataFrame, or CSV path.

Example Usage: Instantiate AQIS Dataframe Object

from aqis import AQISDataFrame  # Adjust based on your module path

# Load data from CSV
aqis_df = AQISDataFrame(data="sample_data.csv")

# Access the internal Dask DataFrame
dask_df = aqis_df.get_dask_df()
print(dask_df.head())

AQIS Engine - Dask Interface

The AQIS interface class is the central interface for interacting with the AQIS Engine - Dask. It manages Dask cluster configurations (local or remote), handles execution modes, and provides utility methods to submit distributed tasks and expose APIs.

Imports:

from aqis import AQIS

Constructor:

class AQIS:
    def __init__(self, 
        mode=ExecutionMode.LOCAL, 
        connections=DEFAULT_CONNECTIONS, 
        dask_connection_id='dask_default'):
Method Parameters Description
__init__ mode=ExecutionMode.LOCAL
connections=DEFAULT_CONNECTIONS
dask_connection_id='dask_default'
Initializes the AQIS object with execution mode, connections, and options. Based on the mode, the cluster is initialized accordingly.

Class Methods:

start_rest_api(self, 
    python_callables: List[Callable] | None = None, 
    address=None, 
    port: int = 8000
)
Method Parameters Description
start_rest_api python_callables:List[Callable]|None=None
address = None
port: int = 8000
Starts a REST API server exposing the given Python callables. Defaults to port 8000 on all interfaces if not specified.
execute_simple(self, 
    python_callable: Callable, 
    *args, 
    print_result: bool = False, 
    **kwargs
)
Method Parameters Description
execute_simple python_callable:Callable
*args
print_result:bool=False
**kwargs
Executes a given Python callable with optional arguments. If print_result is set to True, the result is printed.

Example usage: Start REST API

def start_rest_api(self, 
    python_callables: List[Callable] | None = None, 
    address=None, 
    port: int = 8000):
    """
    Starts a Tornado REST API server to allow Dask functions to be invoked remotely.

    Parameters:
    - python_callables: A list of Python callable functions that will be exposed as REST endpoints.
    - address: The address to bind the server to (optional).
    - port: The port on which the REST API will listen (default is 8000).

    This function does the following:
    - Dynamically generates URL paths for each provided callable using `__generate_rest_api_paths__`.
    - Wraps each callable into a Tornado-compatible handler using `__create_handler_class__`.
    - Registers all handlers in a Tornado web application.
    - Starts the web server and binds it to the specified port.
    - Keeps the server running indefinitely using an `asyncio.Event().wait()` call.
    """
    web_app_items = [
        (self.__generate_rest_api_paths__(f), self.__create_handler_class__(f))
        for f in all_callables  # Presumably a list of callable functions to expose
    ]
    app = tornado.web.Application(web_app_items)
    app.listen(port=port)
    print(f'Starting REST API on address {address} port {port}')
    await asyncio.Event().wait()  # Keeps the server running

AQIS Infisical

The AQISInfisicalObj class is responsible for securely retrieving secrets from an Infisical API and setting them as environment variables at runtime. This utility automates the secret-fetching process by reading a user-defined .ini config file that includes connection information and the list of secrets to retrieve. Secrets are fetched using Infisical’s REST API and then set in the local environment using os.environ.

Imports:

from aqis import AQISInfisicalObj

Constructor:

class AQISInfisicalObj:
    def __init__(self, 
        infisical_config):
Method Parameters Description
__init__ infisical_config Initializes the AQISInfisicalObj with Infisical configuration details.

Class Methods:

def load_infisical_config(self):
Method Parameters Description
load_infisical_config (none) Loads Infisical configuration from an INI file and returns a tuple of Infisical connection details and secret list.
Return Field Type Description
infisical_url str URL of the Infisical API
authorization str Authorization token for Infisical API
environment str Environment name (e.g., dev, prod)
workspace_id str ID of the Infisical workspace
secret_path str Path prefix for secrets retrieval
workspace_slug str Slug identifier for the workspace
secrets_list List[str] List of secret keys to retrieve
infisical_request(infisical_url: str,
    headers: Dict[str, str],
    params: Dict[str, str],
    secret: str
) → dict | None
Function Parameters Description
infisical_request infisical_url:str
headers:Dict[str,str]
params:Dict[str,str]
secret:str
Sends a request to the Infisical API and returns the response as a dictionary, or None on failure.
def execute(self):
Method Parameters Description
execute (none) Executes the main functionality of the object, performing the core task defined in the class.

Configuration Format: Here is a sample .ini configuration format required by the class:

[conn]
infisical_url = https://infisical.mycompany.com
authorization = my-jwt-token
environment = dev
workspace_id = abcd1234
secret_path = /my-app/config
workspace_slug = my-workspace

[secrets]
secrets_list = DB_PASSWORD,API_KEY,SECRET_TOKEN

Logging: Logging is built-in to report:

  • Missing configuration options/sections
  • HTTP request failures
  • Missing secret values

Use Python’s logging module to configure log levels and handlers as needed.

Example Usage: Instantiate Infisical Instance

from aqis import AQISInfisicalObj  # Adjust import path accordingly

# Instantiate with path to configuration file
secrets_handler = AQISInfisicalObj("secrets_config.ini")

# The secrets are now available in the environment
import os
print(os.getenv("DB_PASSWORD"))

Database Operations: Base

This class provides a base for managing a set of named Connection objects and selecting one as "current". It supports:

  • Initializing with a default connection

  • Updating available connections

  • Getting all or the current connection

  • Switching the active connection

The class is inherited/extended by Data System Adaptors for individual (e.g. SQL, Milvus) target databases. Please add these adaptors to your AQIS Engine - Dask installation as necessary.

Imports:

from typing import Dict
from aqis.connection import Connection
  • Dict[str, Connection]: a type hint indicating a dictionary where keys are strings and values are Connection objects.

  • Connection: a class you're importing from the aqis.connection module — presumably it handles some kind of connection (e.g., to a database, API, etc.).

Constructor:

class OpCatalogBase:
    def __init__(self, 
        connection_id: str, 
        connections: Dict[str, Connection]) -> None:
Method Parameters Description
__init__ connection_id: str, connections: Dict[str, Connection] Initializes the catalog with a connection ID and a dictionary of available connections.

Class Methods:

def extend_connections(self, 
    connections: Dict[str, Connection]):
Method Parameters Description
extend_connections connections:Dict[str,Connection] Adds or updates the internal connections dictionary with new entries. Useful for dynamically expanding the set of known connections.
def get_connections(self) -> Dict[str, Connection]:
Method Parameters Description
get_connections (none) Returns the internal dictionary of all available connections.
def get_connection(self) -> Connection:
Method Parameters Description
get_connection (none) Retrieves the connection object associated with the current connection_id.
def set_connection_id(self, connection_id: str):
Method Parameters Description
set_connection_id connection_id: str Sets the current connection_id to use for retrieving connections.