API Reference
This API reference within the AQIS documentation describes the AQIS components (cf. Component Overview) in detail.
Default Connections
The DEFAULT_CONNECTIONS dictionary is a key part of AQIS that defines the default connection settings for the Dask cluster.
This dictionary specifies connection details like the host, port, and extra configurations that define the Dask cluster’s resource usage, such as number of workers, threads per worker, memory limits, and more. This setup ensures AQIS can interact with a Dask cluster in a flexible manner, either locally or remotely.
Note: The default connection provided in DEFAULT_CONNECTIONS is a local Dask cluster configuration. If the execution mode is set to remote, users can provide their own Dask connection details.
Imports:
from aqis import DEFAULT_CONNECTIONS
Dictionary Definition:
DEFAULT_CONNECTIONS[...]
| Key / Parameter | Description |
|---|---|
'dask_default' |
The key in the dictionary and also the connection ID. |
host='localhost' |
Dask scheduler is expected to run on the local machine. |
port=41453 |
Port on which the Dask scheduler is listening. |
conn_type='tcp' |
Connection type – here it’s TCP/IP (suitable for Dask). |
extra={...} |
Additional parameters passed to configure the Dask cluster. |
extra Fields in DEFAULT_CONNECTIONS
| Field | Purpose |
|---|---|
n_workers=4 |
Number of Dask worker processes to spin up. |
threads_per_worker=1 |
Each worker will use a single thread (helps with isolation and debugging). |
memory_limit='1GiB' |
Each worker is restricted to using 1 GiB of RAM. |
processes |
Controls whether workers are spawned as processes (True) or threads (False). |
This is set to False on macOS for compatibility, and True elsewhere by default. |
Example Usage: Default Connection Setup
# Define the default set of connections used by AQIS.
# This dictionary includes a Dask connection configuration under the key 'dask_default'.
# The Dask client is expected to run locally (localhost) on port 41453 using TCP.
# Additional configuration is provided via the `extra` dictionary:
# - n_workers: Number of Dask worker processes to start.
# - threads_per_worker: Each worker will use 1 thread.
# - memory_limit: Each worker is limited to 1 GiB of memory.
# - processes: On macOS, use threads instead of processes for compatibility (False); otherwise, use processes (True).
DEFAULT_CONNECTIONS = {
'dask_default': Connection(
'dask_default',
host='localhost',
port=41453,
conn_type='tcp',
extra={
'n_workers': 4, # Number of workers
'threads_per_worker': 1, # Threads per worker
'memory_limit': '1GiB', # Memory limit per worker
'processes': False if platform.system() == 'Darwin' else True # Use threads on macOS
}
)
}
Execution Mode
The ExecutionMode class is an Enum that defines the two available modes of execution for AQIS.
This abstraction allows developers to easily toggle between development and production/distributed environments without changing the logic of their operations.
LOCAL: Executes tasks on a local Dask cluster.REMOTE: Executes tasks on a remote Dask cluster.
Imports:
from aqis import ExecutionMode
Class Definition:
class ExecutionMode(enum)
Enum Values:
| Mode | Value | Description |
|---|---|---|
LOCAL |
'local' |
Use a local Dask cluster for execution |
REMOTE |
'remote' |
Use a remote Dask cluster for execution |
Example Usage: Setting Execution Mode
from aqis import ExecutionMode
# Create AQIS instance with remote mode
aqis = AQIS(mode=ExecutionMode.REMOTE)
AQIS Dataframe
The AQISDataFrame class is an abstraction over Dask and Pandas DataFrames. It allows users to load data from various sources (such as CSV files or Pandas DataFrames) into Dask DataFrames, enabling distributed data processing.
Imports:
from aqis import AQISDataFrame
Constructor:
class AQISDataFrame():
def __init__(self,
data=None,
npartitions=4,
connection=None,
client=None):
| Method | Parameters | Description |
|---|---|---|
__init__ |
data=Nonenpartitions=4connection=Noneclient=None |
Initializes the AQISDataFrame with either a Dask DataFrame, Pandas DataFrame, or CSV path. |
Example Usage: Instantiate AQIS Dataframe Object
from aqis import AQISDataFrame # Adjust based on your module path
# Load data from CSV
aqis_df = AQISDataFrame(data="sample_data.csv")
# Access the internal Dask DataFrame
dask_df = aqis_df.get_dask_df()
print(dask_df.head())
AQIS Engine - Dask Interface
The AQIS interface class is the central interface for interacting with the AQIS Engine - Dask.
It manages Dask cluster configurations (local or remote), handles execution modes, and provides utility methods to submit distributed tasks and expose APIs.
Imports:
from aqis import AQIS
Constructor:
class AQIS:
def __init__(self,
mode=ExecutionMode.LOCAL,
connections=DEFAULT_CONNECTIONS,
dask_connection_id='dask_default'):
| Method | Parameters | Description |
|---|---|---|
__init__ |
mode=ExecutionMode.LOCALconnections=DEFAULT_CONNECTIONSdask_connection_id='dask_default' |
Initializes the AQIS object with execution mode, connections, and options. Based on the mode, the cluster is initialized accordingly. |
Class Methods:
start_rest_api(self,
python_callables: List[Callable] | None = None,
address=None,
port: int = 8000
)
| Method | Parameters | Description |
|---|---|---|
start_rest_api |
python_callables:List[Callable]|None=Noneaddress = Noneport: int = 8000 |
Starts a REST API server exposing the given Python callables. Defaults to port 8000 on all interfaces if not specified. |
execute_simple(self,
python_callable: Callable,
*args,
print_result: bool = False,
**kwargs
)
| Method | Parameters | Description |
|---|---|---|
execute_simple |
python_callable:Callable*argsprint_result:bool=False**kwargs |
Executes a given Python callable with optional arguments. If print_result is set to True, the result is printed. |
Example usage: Start REST API
def start_rest_api(self,
python_callables: List[Callable] | None = None,
address=None,
port: int = 8000):
"""
Starts a Tornado REST API server to allow Dask functions to be invoked remotely.
Parameters:
- python_callables: A list of Python callable functions that will be exposed as REST endpoints.
- address: The address to bind the server to (optional).
- port: The port on which the REST API will listen (default is 8000).
This function does the following:
- Dynamically generates URL paths for each provided callable using `__generate_rest_api_paths__`.
- Wraps each callable into a Tornado-compatible handler using `__create_handler_class__`.
- Registers all handlers in a Tornado web application.
- Starts the web server and binds it to the specified port.
- Keeps the server running indefinitely using an `asyncio.Event().wait()` call.
"""
web_app_items = [
(self.__generate_rest_api_paths__(f), self.__create_handler_class__(f))
for f in all_callables # Presumably a list of callable functions to expose
]
app = tornado.web.Application(web_app_items)
app.listen(port=port)
print(f'Starting REST API on address {address} port {port}')
await asyncio.Event().wait() # Keeps the server running
AQIS Infisical
The AQISInfisicalObj class is responsible for securely retrieving secrets from an Infisical API and setting them as environment variables at runtime. This utility automates the secret-fetching process by reading a user-defined .ini config file that includes connection information and the list of secrets to retrieve. Secrets are fetched using Infisical’s REST API and then set in the local environment using os.environ.
Imports:
from aqis import AQISInfisicalObj
Constructor:
class AQISInfisicalObj:
def __init__(self,
infisical_config):
| Method | Parameters | Description |
|---|---|---|
__init__ |
infisical_config |
Initializes the AQISInfisicalObj with Infisical configuration details. |
Class Methods:
def load_infisical_config(self):
| Method | Parameters | Description |
|---|---|---|
load_infisical_config |
(none) | Loads Infisical configuration from an INI file and returns a tuple of Infisical connection details and secret list. |
| Return Field | Type | Description |
|---|---|---|
infisical_url |
str |
URL of the Infisical API |
authorization |
str |
Authorization token for Infisical API |
environment |
str |
Environment name (e.g., dev, prod) |
workspace_id |
str |
ID of the Infisical workspace |
secret_path |
str |
Path prefix for secrets retrieval |
workspace_slug |
str |
Slug identifier for the workspace |
secrets_list |
List[str] |
List of secret keys to retrieve |
infisical_request(infisical_url: str,
headers: Dict[str, str],
params: Dict[str, str],
secret: str
) → dict | None
| Function | Parameters | Description |
|---|---|---|
infisical_request |
infisical_url:strheaders:Dict[str,str]params:Dict[str,str]secret:str |
Sends a request to the Infisical API and returns the response as a dictionary, or None on failure. |
def execute(self):
| Method | Parameters | Description |
|---|---|---|
execute |
(none) | Executes the main functionality of the object, performing the core task defined in the class. |
Configuration Format:
Here is a sample .ini configuration format required by the class:
[conn]
infisical_url = https://infisical.mycompany.com
authorization = my-jwt-token
environment = dev
workspace_id = abcd1234
secret_path = /my-app/config
workspace_slug = my-workspace
[secrets]
secrets_list = DB_PASSWORD,API_KEY,SECRET_TOKEN
Logging: Logging is built-in to report:
- Missing configuration options/sections
- HTTP request failures
- Missing secret values
Use Python’s logging module to configure log levels and handlers as needed.
Example Usage: Instantiate Infisical Instance
from aqis import AQISInfisicalObj # Adjust import path accordingly
# Instantiate with path to configuration file
secrets_handler = AQISInfisicalObj("secrets_config.ini")
# The secrets are now available in the environment
import os
print(os.getenv("DB_PASSWORD"))
Database Operations: Base
This class provides a base for managing a set of named Connection objects and selecting one as "current". It supports:
-
Initializing with a default connection
-
Updating available connections
-
Getting all or the current connection
-
Switching the active connection
The class is inherited/extended by Data System Adaptors for individual (e.g. SQL, Milvus) target databases. Please add these adaptors to your AQIS Engine - Dask installation as necessary.
Imports:
from typing import Dict
from aqis.connection import Connection
-
Dict[str, Connection]: a type hint indicating a dictionary where keys are strings and values are Connection objects. -
Connection: a class you're importing from the aqis.connection module — presumably it handles some kind of connection (e.g., to a database, API, etc.).
Constructor:
class OpCatalogBase:
def __init__(self,
connection_id: str,
connections: Dict[str, Connection]) -> None:
| Method | Parameters | Description |
|---|---|---|
__init__ |
connection_id: str, connections: Dict[str, Connection] |
Initializes the catalog with a connection ID and a dictionary of available connections. |
Class Methods:
def extend_connections(self,
connections: Dict[str, Connection]):
| Method | Parameters | Description |
|---|---|---|
extend_connections |
connections:Dict[str,Connection] |
Adds or updates the internal connections dictionary with new entries. Useful for dynamically expanding the set of known connections. |
def get_connections(self) -> Dict[str, Connection]:
| Method | Parameters | Description |
|---|---|---|
get_connections |
(none) | Returns the internal dictionary of all available connections. |
def get_connection(self) -> Connection:
| Method | Parameters | Description |
|---|---|---|
get_connection |
(none) | Retrieves the connection object associated with the current connection_id. |
def set_connection_id(self, connection_id: str):
| Method | Parameters | Description |
|---|---|---|
set_connection_id |
connection_id: str |
Sets the current connection_id to use for retrieving connections. |