Getting Started
Below, we get you started by showing you:
- how to install the AQIS Engine - Dask package,
- a short example of a workflow accessing a database using the AQIS engine,
- more details to understand the example (and AQIS Engine - Dask in general), and
- how to work with the Database Catalogue with secrets management (based on Infisical).
How to install the AQIS Engine - Dask package
The package is currently distributed via IT4I's Opencode gitlab. For installation we recommend a virtual environment; please make sure Python 3.12 or newer are avaialable.
Make a virtual environment:
python -m venv ~/aqis_venv
cd ~/aqis_venv
source ~/aqis_venv/bin/activate
Install Dask and AQIS Engine - Dask:
pip install dask
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
Install at least one of our Data System Adaptors (we recommend the Milvus adaptor to execute the short example below) for using a data backend with the engine. To this end:
Install Milvus adaptor dependencies:
pip install pymilvus
Install Milvus adaptor:
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-milvus.git
A short example of a workflow using the AQIS engine
The Python command sequence below performs a search in a Milvus vector database, using the AQIS Engine - Dask.
- Import the AQIS engine and Milvus adaptor:
from aqis import AQIS, DEFAULT_CONNECTIONS, Connection, ExecutionMode
from aqis_adaptor_milvus import AQISMilvus
- Create connection:
connections = DEFAULT_CONNECTIONS.copy()
connections['milvus_default'] = Connection(
conn_id='milvus',
host=db_host,
port=int(db_port),
login=db_user,
password=db_pass,
conn_type='http'
- Instantiate local AQIS instance:
aqis_local_instance = AQIS(connections=connections)
- Introduce Milvus functionality:
operation_pack = AQISMilvus()
- Run the
execute_simplefunction which distributes the search computation:
aqis_local_instance.execute_simple(operation_pack.search,
collection='example_collection’,
anns_field='blib_feat’,
output_fields=["img"],
data=query_vector,
search_params={"metric_type": "L2",
"params": {"nprobe": 10}},
print_result=True)
More details to understand the example (and AQIS Engine - Dask in general)
Creating a connection
In order to establish a connection to a Dask cluster or to a DBMS, the AQIS engine provides the following connection structure. The following example is built upon AQIS's DEFAULT_CONNECTIONS class which will provide a default structure for establishing a relevant connections.
# Start with a copy of the default connections to avoid modifying the original dictionary
connections = DEFAULT_CONNECTIONS.copy()
# Add a new PostgreSQL connection entry with custom credentials and database settings
connections['pg_default'] = Connection(
conn_id='pg_default', # Unique identifier for this connection
host=db_host, # Hostname or IP of the PostgreSQL server
port=db_port, # Port where PostgreSQL is listening (typically 5432)
database='experimental_db', # Name of the database to connect to
login=db_user, # Username for authentication
password=db_pass, # Password for authentication
conn_type='postgresql' # Type of connection (used internally to handle logic)
)
# Add a Dask remote cluster connection
connections['dask_remote'] = Connection(
conn_id='dask_remote', # Unique identifier for the Dask connection
host=dask_host_ip, # IP address of the remote Dask scheduler
port=dask_port, # Default port used by Dask schedulers
conn_type='tcp' # Connection type for Dask (TCP for inter-process communication)
)
The AQIS dataframe
The AQIS DataFrame serves as a wrapper around the Dask DataFrame which is passed to the execute_simple function for processing.
# Create an AQISDataFrame from a Pandas DataFrame (`df`) and associate it with the PostgreSQL connection (`pg_default`).
# This enables the dataframe to interact with the database for operations like reading/writing.
adf = AQISDataFrame(data=df,
connection=connections['pg_default'])
Instantiating an AQIS instance
To instantiate an AQIS instance either to a local or remote Dask cluster, the following example could be used.
- Local execution
# Initialize AQIS instance for local execution.
# This means all tasks (data loading, processing, etc.) will run on the local machine using the local Dask scheduler.
aqis_instance = AQIS(mode=ExecutionMode.LOCAL,
connections=connections)
- Remote execution
# Initialize AQIS instance for remote execution.
# In this case, tasks will be scheduled on a remote Dask cluster using the connection defined as 'dask_remote'.
aqis_instance = AQIS(mode=ExecutionMode.REMOTE,
dask_connection_id='dask_remote',
connections=connections)
The execute_simple function
The execute_simple function allows you to submit a python callable function for Dask synchronous execution.
You pass in a Python function or method you want to run, for example: my_func. The *args and **kwargs allow you to pass any number of positional and keyword arguments to that function.
Example:
def execute_simple(self,
python_callable: Callable, # A reference to the function that should be executed
*args, # Positional arguments passed to the callable
print_result=False, # If True, prints the result of the function execution
**kwargs): # Keyword arguments passed to the callable
"""
Executes a Python function with optional arguments.
:param python_callable: The function to execute.
:param args: Positional arguments for the function.
:param print_result: Whether to print the result after execution.
:param kwargs: Keyword arguments for the function.
:return: The result of the function execution.
"""
AQIS Database Catalogue with secrets management (based on Infisical)
The AQIS engine provides an interface for secret management implemented using Infisical, a secrets management tool. Infisical runs as a service typically on a remote server accessible over HTTPS, and interaction with this service is achieved through the use of Infisical's REST API ensuring secure transactions and low overhead. For details on this see our usage scheme as a Database (and credentials) Catalog and/or visit the Infisical vendor page.
In order to carry out an Infisical request for credentials, for say a storage medium (DBMS, Object-store, etc...) the AQIS-Engine will require an associated configuration file. This (.ini) file will contain information, such as:
- The Infisical server's hostname and port.
- Associated project details, and an associated security token for validation.
- The names of the secrets that are to be retrieved.
For example, the following details a typical configuration file required to connect to an Infisical service.
[conn]
infisical_url = <Infisical hostname>
authorization = <Authorization token>
environment = <Working environment>
workspace_id = <Infisical workspace>
secret_path = <Secrets Path>
workspace_slug = <Infisical workspace slug>
# Secrets to fetch
[secrets]
secrets_list = <Secret1, secret2, secret3, ...>
Secrets retrieved from Infisical will be stored as OS environment variables. To instantiate an Infisical instance, the following example can be used.
import os
from aqis.aqis_infisical import AQISInfisicalObj
# Define the path to the Infisical configuration file
INFISICAL_CONF = "<Infisical configuration file>"
# Create an instance of AQISInfisicalObj
infisical_obj = AQISInfisicalObj(INFISICAL_CONF)
# Print environment variables
print(os.getenv("DATABASE_USERNAME"))
print(os.getenv("DATABASE_PASSWORD"))
print(os.getenv("DATABASE_HOSTNAME"))
print(os.getenv("DATABASE_PORT"))
print(os.getenv("DATABASE_NAME"))