Data System Adaptors
A collection of Data System Adaptors is offered in EXA4MIND to make it easy for you to address data backends (database management systems or object stores).
These adaptors encapsulate standard operations to be executed from Airflow or Python-Dask environments.
Data System Adaptors for Airflow
When you are working with Airflow to automatise your data analytics, we have made sure you find Airflow operators for standard database operations (in particular for CRUD operations). Following the Airflow semantics, these operators are grouped in Airflow providers.
For a few backends, we have written new providers:
- iRODS
- Milvus
while for others, we are relying on existing providers and give usage examples:
- PostgreSQL
- MariaDB
- Opensearch
- S3 (MINIO)
Our providers and usage examples are published under https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-airflow.
Data System Adaptors for AQIS Engine - Dask
To homogeneously facilitate database operations from the AQIS Engine - Dask environment, we have derived database-specific Data System Adaptor classes from the OpCatalogBase operation-standardisation superclass (cf. Component Overview and API reference).
These classes have to be installed separately according to your needs in our modularised AQIS-Engine paradigm. You can find the adaptor repositories under https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask.
The classes are described below. For other (non-adaptor) class references please consult the API reference.
Milvus Adaptor
The AQISMilvus class is a specialized database client built on top of OpCatalogBase and is used for interacting with Milvus, a vector database. It uses the pymilvus client to connect and perform operations.
It enables:
- Connection Management which manages and retrieves connection credentials (host, port, login info) using a centralized Connection system inherited from OpCatalogBase.
- Collection Management to easily list all collections and retrieve schema information with list_collections() and describe_collection().
- Vector data insertion into Milvus collections via insert() – supports high-dimensional vector storage, e.g., from embeddings.
Imports and Dependencies:
from typing import Dict
from aqis.operations import OpCatalogBase, Connection
from pymilvus import MilvusClient
Dict: Type annotations using dictionaries.OpCatalogBase: The base class for managing multiple database connections.Connection: The Connection object which represents connection details.MilvusClient: The official Milvus Python client.
Installation (including dependencies, assuming you are in a virtual environment with Python 3.12 or newer):
pip install dask pymilvus
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-milvus.git
Constructor:
class AQISMilvus(OpCatalogBase):
def __init__(self,.
milvus_connection_id: str = "milvus_default",.
connections: Dict[str, Connection] = {}) -> None:
| Method | Parameters | Description |
|---|---|---|
__init__ |
milvus_connection_id:str="milvus_default" connections:Dict[str,Connection] = {} |
Initializes the AQISMilvus instance with a default or custom connection ID and an optional set of connections. |
Class Methods:
def list_collections(self):
| Method | Parameters | Description |
|---|---|---|
list_collections |
(none) | Retrieves a list of all collections in the Milvus database using the MilvusClient. |
def descibe_collection(self,.
collection):
| Method | Parameters | Description |
|---|---|---|
describe_collection |
collection:str |
Retrieves the schema description of the specified collection in the Milvus database. |
def insert(self,.
collection,.
data):
| Method | Parameters | Description |
|---|---|---|
insert |
collection:str data: any |
Inserts data into the specified collection in the Milvus database. |
def search(self,
collection,.
anns_field,.
output_fields: list[str],.
data,.
limit=5,.
search_params=None):
| Method | Parameters | Description |
|---|---|---|
search |
collection:str anns_field:str output_fields:list[str] data:anylimit:int=5 search_params:dict|None |
Performs a vector similarity search on the specified collection using the provided data and parameters. |
MariaDB Adaptor
The AQISMariaDB class is a specialized database client built on top of OpCatalogBase and it is used for interacting with MariaDB. The interface supports operations on a MariaDB (or MySQL-compatible) database using SQLAlchemy and Dask
It enables:
- Connection Management which manages and retrieves connection credentials (host, port, login info) using a centralized Connection system inherited from OpCatalogBase.
- Reading from and writing to the database using Dask DataFrames and executing raw SQL statements.
Imports and Dependencies:
from typing import Dict
from dask.dataframe import read_sql_query, to_sql
from dask.distributed import print
from sqlalchemy import TextClause
from pandas import DataFrame
import sqlalchemy
from aqis.operations import OpCatalogBase, Connection
Dict: Defines type annotations using dictionaries.read_sql_query: Used to read data from SQL databases into a Dask DataFrame in parallel.to_sql: Used to write a Dask DataFrame to a SQL table.TextClause: Represents an SQL text fragment or statement in SQLAlchemy.DataFrame: Class from the Pandas library.sqlalchemy: Imports the full SQLAlchemy package.OpCatalogBase: A base class for managing multiple database connections.Connection: The Connection object which represents connection details.
Installation (including dependencies, assuming you are in a virtual environment with Python 3.12 or newer):
pip install dask sqlalchemy pymysql
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-mariadb.git
Constructor:
class AQISMariaDB(OpCatalogBase):
def __init__(self,
db_connection_id: str = "db_default",
connections: Dict[str, Connection] = {}) -> None
| Method | Parameters | Description |
|---|---|---|
__init__ |
db_connection_id:str="db_default"connections:Dict[str, Connection]={} |
Initializes the AQISMariaDB object with a database connection ID and an optional dictionary of connections. |
Class Methods:
def read(self,
alchemysql: sqlalchemy.Select,
index_col: any,
*args, print_dataframe=False,
**kwargs) -> DataFrame:
| Method | Parameters | Description |
|---|---|---|
read |
alchemysql:sqlalchemy.Selectindex_col:any*argsprint_dataframe:bool=False**kwargs |
Reads data from the MariaDB database using a SQLAlchemy Select query and returns a Dask DataFrame. If print_dataframe is True, the DataFrame is printed. |
def exec_sql(self,
sql: TextClause):
| Method | Parameters | Description |
|---|---|---|
exec_sql |
sql:sqlalchemy.TextClause |
Executes a raw SQL statement using SQLAlchemy. Begins a transaction, executes the SQL, commits, and closes the connection. |
def write(self,
dataframe:DataFrame,
name: str,
*args,
**kwargs)->int|None:
| Method | Parameters | Description |
|---|---|---|
write |
dataframe:pandas.DataFramename:str*args**kwargs |
Writes a Pandas DataFrame to the MariaDB database table with the specified name using Dask's to_sql. |
MongoDB Adaptor
The AQISMongoDB class is a specialized database client built on top of OpCatalogBase and is used for interacting with MongoDB. The interface supports operations on a MongoDB using PyMongo to perform standard database operations.
It enables:
- Connection Management which manages and retrieves connection credentials (host, port, login info) using a centralized Connection system inherited from OpCatalogBase.
- Functionality for querying the database, listing databases and collections, inserting documents into the database, and for dropping collections from the database.
Imports and Dependencies
from typing import Dict, Any
from aqis.operations import OpCatalogBase, Connection
from pymongo import MongoClient
Dict, Any: Dict and Any are types from Python’s typing module.OpCatalogBase: The base class for managing multiple database connections.Connection: An object representing a database connection.MongoClient: The MongoClient class from the pymongo library.
Installation (including dependencies, assuming you are in a virtual environment with Python 3.12 or newer):
pip install dask pymongo
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-mongodb.git
Constructor:
class AQISMongoDB(OpCatalogBase):
def __init__(self,
mongo_connection_id: str = "mongo_default",
connections: Dict[str, Connection] = {}) -> None:
| Method | Parameters | Description |
|---|---|---|
__init__ |
mongo_connection_id: str = "mongo_default",connections:Dict[str, Connection]={} |
Initializes the AQISMongoDB object with a database connection ID and an optional dictionary of connections. |
Class Methods:
def list_databases(self):
| Method | Parameters | Description |
|---|---|---|
list_databases |
None | Returns a list of all database names available in the MongoDB instance. |
def insert(self,
database: str,
collection: str,
data:list[dict]):
| Method | Parameters | Description |
|---|---|---|
insert |
database:strcollection:strdata:list[dict] |
Inserts a list of documents (data) into the specified collection of the given database. |
def find(self,
database: str,
collection: str,
query: dict = {},
projection: dict = None) -> list[dict]:
| Method | Parameters | Description |
|---|---|---|
find |
database:strcollection:strquery:dict = {}projection:dict = None |
Finds documents in a collection matching the given query. Optionally specify a projection to limit fields. |
def drop_collection(self,
database: str,
collection: str):
| Method | Parameters | Description |
|---|---|---|
drop_collection |
database:strcollection:str |
Drops the specified collection from the given MongoDB database. |
OpenSearch Adaptor
The AQISOpenSearch class is a specialized client built on top of OpCatalogBase and is used for interacting with OpenSearch. The class uses opensearchpy to create a high-level interface for interacting with an OpenSearch cluster
It enables:
- Connection Management which manages and retrieves connection credentials (host, port, login info) using a centralized Connection system inherited from OpCatalogBase.
- Functionality such as, listing indexes, indexing documents and executing search queries is provided by
opensearchpy.
Imports and Dependences
from typing import Dict, Any
from aqis.operations import OpCatalogBase, Connection
from opensearchpy import OpenSearch
Dict, Any: Dict and Any are types from Python’s typing module.OpCatalogBase: The base class for managing multiple database connections.OpenSearch: The OpenSearch class from the opensearch-py package.
Installation (including dependencies, assuming you are in a virtual environment with Python 3.12 or newer):
pip install dask opensearch-py
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-opensearch.git
Constructor:
class AQISOpenSearch(OpCatalogBase):
def __init__(self,
os_connection_id: str = "opensearch_default",
connections: Dict[str, Connection] = {}) -> None:
| Method | Parameters | Description |
|---|---|---|
__init__ |
os_connection_id:str="opensearch_default"connections:Dict[str, Connection] = {} |
Initializes the OpenSearch interface with the given connection ID and optional connection dictionary. |
def list_indexes(self) -> list[str]:
| Method | Parameters | Description |
|---|---|---|
list_indexes |
None | Returns a list of all index names in the connected OpenSearch cluster. |
def index_doc(self,
index: str,
doc: Dict[str, Any]) -> dict:
| Method | Parameters | Description |
|---|---|---|
index_doc |
index:strdoc:Dict[str, Any] |
Indexes a single document (doc) into the specified OpenSearch index. |
def search(self,
index: str,
query: Dict[str, Any]) -> list[dict]:
| Method | Parameters | Description |
|---|---|---|
search |
index:strquery:Dict[str, Any] |
Executes a search query against the specified index and returns the hits. |
PostgreSQL Adaptor
The AQISPostgreSQL class is a specialized client built on top of OpCatalogBase and is used for interacting with PostgreSQL.
The class provides Dask-based access to PostgreSQL databases. The interface supports operations on a PostgreSQL databases using SQLAlchemy and Dask.
It enables:
- Connection Management which manages and retrieves connection credentials (host, port, login info) using a centralized Connection system inherited from OpCatalogBase.
- Standard database operations such as, reading from and writing to the database using Dask DataFrames and executing raw SQL statements using sqlalchemy.
Imports and Dependencies
from typing import Dict
from dask.dataframe import read_sql_query, to_sql
from dask.distributed import print
from sqlalchemy import TextClause
from pandas import DataFrame
import sqlalchemy
from aqis.operations import OpCatalogBase, Connection
Dict: Type annotations using dictionaries.read_sql_query: Used to read data from SQL databases into a Dask DataFrame in parallel.to_sql: Used to write a Dask DataFrame to a SQL table.print: A special print function from dask.distributed.TextClause: Represents a SQL text fragment or statement in SQLAlchemy.DataFrame: Is a class from the pandas library.sqlalchemyThe entire SQLAlchemy library.OpCatalogBase: The base class for managing multiple database connections.Connection: An object representing connection details.
Installation (including dependencies, assuming you are in a virtual environment with Python 3.12 or newer):
pip install dask sqlalchemy psycopg2-binary
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-postgres.git
Constructor:
class AQISPostgreSQL(OpCatalogBase):
def __init__(self,
db_connection_id: str = "db_default",
connections: Dict[str, Connection] = {}) -> None:
| Method | Parameters | Description |
|---|---|---|
__init__ |
db_connection_id:str="db_default"connections:Dict[str,Connection]={} |
Initializes the Postgres interface with the given connection ID and optional connection dictionary. |
def read(self,
alchemysql: sqlalchemy.Select,
index_col: any, *args, print_dataframe=False, **kwargs) -> DataFrame:
| Method | Parameters | Description |
|---|---|---|
read |
alchemysql: sqlalchemy.Selectindex_col: any*argsprint_dataframe: bool = False**kwargs |
Reads data from PostgreSQL using a SQLAlchemy Select object. Supports Dask’s parallelism. If print_dataframe is True, prints the resulting Dask DataFrame. Returns a Dask DataFrame. |
def exec_sql(self,
sql: TextClause):
| Method | Parameters | Description |
|---|---|---|
exec_sql |
sql:TextClause |
Executes a raw SQL command against the PostgreSQL database. |
def write(self,
dataframe: DataFrame,
name: str,
*args, **kwargs)->int|None:
| Method | Parameters | Description |
|---|---|---|
write |
dataframe:DataFramename:str*args**kwargs |
Writes a Dask or Pandas DataFrame to a PostgreSQL table with the given name. Returns the number of rows written or None. |
S3 Adaptor
The AQISS3 class is a specialized database client built on top of OpCatalogBase and is used for interacting with Minio.The class provides functionality to interact with an S3-compatible object storage like MinIO using boto3.
It enables:
- Connection Management which manages and retrieves connection credentials (host, port, login info) using a centralized Connection system inherited from OpCatalogBase.
- Functionality for file uploads, file downloads, and also listing operations using boto3.
Imports and Dependences:
from typing import Dict
from pandas import DataFrame
import boto3
import os
from aqis.operations import OpCatalogBase, Connection
Dict: Type annotations using dictionaries.DataFrame: The Dataframe class from the Pandas library.boto3: The boto3 library, which is the official AWS SDK for Python.OpCatalogBase: The base class for managing multiple database connections.Connection: The connection object which representing connection details.
Installation (including dependencies, assuming you are in a virtual environment with Python 3.12 or newer):
pip install dask boto3
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-s3.git
Constructor
class AQISS3(OpCatalogBase):
def __init__(self,
s3_connection_id:str = "s3_minio",
connections: Dict[str, Connection] = {}) -> None:
| Method | Parameters | Description |
|---|---|---|
__init__ |
s3_connection_id:str="s3_minio"connections:Dict[str, Connection] = {} |
Initializes the AQISS3 object with credentials and bucket info from a Connection. Sets up the boto3 client. |
def upload_file(self,
file_path: str,
object_name: str) -> None:
| Method | Parameters | Description |
|---|---|---|
upload_file |
file_path:strobject_name:str |
Uploads a local file (file_path) to the S3 bucket using the specified object_name as the key. |
def download_file(self,
object_name: str,
file_path: str) -> None:
| Method | Parameters | Description |
|---|---|---|
download_file |
object_name:strfile_path:str |
Downloads a file from the S3 bucket (object_name) to a local path (file_path). |
def list_files(self) -> list:
| Method | Parameters | Description |
|---|---|---|
list_files |
None | Lists all object keys in the configured S3 bucket. Returns a list of file names. |