Data System Adaptors

A collection of Data System Adaptors is offered in EXA4MIND to make it easy for you to address data backends (database management systems or object stores).

These adaptors encapsulate standard operations to be executed from Airflow or Python-Dask environments.

Data System Adaptors for Airflow

When you are working with Airflow to automatise your data analytics, we have made sure you find Airflow operators for standard database operations (in particular for CRUD operations). Following the Airflow semantics, these operators are grouped in Airflow providers.

For a few backends, we have written new providers:

  • iRODS
  • Milvus

while for others, we are relying on existing providers and give usage examples:

  • PostgreSQL
  • MariaDB
  • Opensearch
  • S3 (MINIO)

Our providers and usage examples are published under https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-airflow.

Data System Adaptors for AQIS Engine - Dask

To homogeneously facilitate database operations from the AQIS Engine - Dask environment, we have derived database-specific Data System Adaptor classes from the OpCatalogBase operation-standardisation superclass (cf. Component Overview and API reference).

These classes have to be installed separately according to your needs in our modularised AQIS-Engine paradigm. You can find the adaptor repositories under https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask.

The classes are described below. For other (non-adaptor) class references please consult the API reference.

Milvus Adaptor

The AQISMilvus class is a specialized database client built on top of OpCatalogBase and is used for interacting with Milvus, a vector database. It uses the pymilvus client to connect and perform operations.

It enables:

  • Connection Management which manages and retrieves connection credentials (host, port, login info) using a centralized Connection system inherited from OpCatalogBase.
  • Collection Management to easily list all collections and retrieve schema information with list_collections() and describe_collection().
  • Vector data insertion into Milvus collections via insert() – supports high-dimensional vector storage, e.g., from embeddings.

Imports and Dependencies:

from typing import Dict
from aqis.operations import OpCatalogBase, Connection
from pymilvus import MilvusClient
  • Dict: Type annotations using dictionaries.
  • OpCatalogBase: The base class for managing multiple database connections.
  • Connection: The Connection object which represents connection details.
  • MilvusClient: The official Milvus Python client.

Installation (including dependencies, assuming you are in a virtual environment with Python 3.12 or newer):

pip install dask pymilvus
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-milvus.git

Constructor:

class AQISMilvus(OpCatalogBase):
    def __init__(self,.
        milvus_connection_id: str = "milvus_default",.
        connections: Dict[str, Connection] = {}) -> None:
Method Parameters Description
__init__ milvus_connection_id:str="milvus_default"
connections:Dict[str,Connection] = {}
Initializes the AQISMilvus instance with a default or custom connection ID and an optional set of connections.

Class Methods:

def list_collections(self):
Method Parameters Description
list_collections (none) Retrieves a list of all collections in the Milvus database using the MilvusClient.
def descibe_collection(self,.
    collection):
Method Parameters Description
describe_collection collection:str Retrieves the schema description of the specified collection in the Milvus database.
def insert(self,.
    collection,.
    data):
Method Parameters Description
insert collection:str
data: any
Inserts data into the specified collection in the Milvus database.
def search(self,
    collection,.
    anns_field,.
    output_fields: list[str],.
    data,.
    limit=5,.
    search_params=None):
Method Parameters Description
search collection:str
anns_field:str
output_fields:list[str]
data:any
limit:int=5
search_params:dict|None
Performs a vector similarity search on the specified collection using the provided data and parameters.

MariaDB Adaptor

The AQISMariaDB class is a specialized database client built on top of OpCatalogBase and it is used for interacting with MariaDB. The interface supports operations on a MariaDB (or MySQL-compatible) database using SQLAlchemy and Dask

It enables:

  • Connection Management which manages and retrieves connection credentials (host, port, login info) using a centralized Connection system inherited from OpCatalogBase.
  • Reading from and writing to the database using Dask DataFrames and executing raw SQL statements.

Imports and Dependencies:

from typing import Dict
from dask.dataframe import read_sql_query, to_sql
from dask.distributed import print
from sqlalchemy import TextClause
from pandas import DataFrame
import sqlalchemy
from aqis.operations import OpCatalogBase, Connection
  • Dict: Defines type annotations using dictionaries.
  • read_sql_query: Used to read data from SQL databases into a Dask DataFrame in parallel.
  • to_sql: Used to write a Dask DataFrame to a SQL table.
  • TextClause: Represents an SQL text fragment or statement in SQLAlchemy.
  • DataFrame: Class from the Pandas library.
  • sqlalchemy: Imports the full SQLAlchemy package.
  • OpCatalogBase: A base class for managing multiple database connections.
  • Connection: The Connection object which represents connection details.

Installation (including dependencies, assuming you are in a virtual environment with Python 3.12 or newer):

pip install dask sqlalchemy pymysql
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-mariadb.git

Constructor:

class AQISMariaDB(OpCatalogBase):
    def __init__(self, 
        db_connection_id: str = "db_default", 
        connections: Dict[str, Connection] = {}) -> None
Method Parameters Description
__init__ db_connection_id:str="db_default"
connections:Dict[str, Connection]={}
Initializes the AQISMariaDB object with a database connection ID and an optional dictionary of connections.

Class Methods:

def read(self, 
    alchemysql: sqlalchemy.Select, 
    index_col: any,
     *args, print_dataframe=False,
     **kwargs) -> DataFrame:
Method Parameters Description
read alchemysql:sqlalchemy.Select
index_col:any
*args
print_dataframe:bool=False
**kwargs
Reads data from the MariaDB database using a SQLAlchemy Select query and returns a Dask DataFrame. If print_dataframe is True, the DataFrame is printed.
def exec_sql(self, 
    sql: TextClause):
Method Parameters Description
exec_sql sql:sqlalchemy.TextClause Executes a raw SQL statement using SQLAlchemy. Begins a transaction, executes the SQL, commits, and closes the connection.
def write(self, 
    dataframe:DataFrame, 
    name: str,
    *args, 
    **kwargs)->int|None:
Method Parameters Description
write dataframe:pandas.DataFrame
name:str
*args
**kwargs
Writes a Pandas DataFrame to the MariaDB database table with the specified name using Dask's to_sql.

MongoDB Adaptor

The AQISMongoDB class is a specialized database client built on top of OpCatalogBase and is used for interacting with MongoDB. The interface supports operations on a MongoDB using PyMongo to perform standard database operations.

It enables:

  • Connection Management which manages and retrieves connection credentials (host, port, login info) using a centralized Connection system inherited from OpCatalogBase.
  • Functionality for querying the database, listing databases and collections, inserting documents into the database, and for dropping collections from the database.

Imports and Dependencies

from typing import Dict, Any
from aqis.operations import OpCatalogBase, Connection
from pymongo import MongoClient
  • Dict, Any: Dict and Any are types from Python’s typing module.
  • OpCatalogBase: The base class for managing multiple database connections.
  • Connection: An object representing a database connection.
  • MongoClient: The MongoClient class from the pymongo library.

Installation (including dependencies, assuming you are in a virtual environment with Python 3.12 or newer):

pip install dask pymongo
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-mongodb.git

Constructor:

class AQISMongoDB(OpCatalogBase):
    def __init__(self, 
        mongo_connection_id: str = "mongo_default", 
        connections: Dict[str, Connection] = {}) -> None:
Method Parameters Description
__init__ mongo_connection_id: str = "mongo_default",
connections:Dict[str, Connection]={}
Initializes the AQISMongoDB object with a database connection ID and an optional dictionary of connections.

Class Methods:

 def list_databases(self):
Method Parameters Description
list_databases None Returns a list of all database names available in the MongoDB instance.
def insert(self, 
    database: str, 
    collection: str, 
    data:list[dict]):
Method Parameters Description
insert database:str
collection:str
data:list[dict]
Inserts a list of documents (data) into the specified collection of the given database.
def find(self, 
    database: str, 
    collection: str,
    query: dict = {},
    projection: dict = None) -> list[dict]:
Method Parameters Description
find database:str
collection:str
query:dict = {}
projection:dict = None
Finds documents in a collection matching the given query. Optionally specify a projection to limit fields.
def drop_collection(self, 
    database: str, 
    collection: str):
Method Parameters Description
drop_collection database:str
collection:str
Drops the specified collection from the given MongoDB database.

OpenSearch Adaptor

The AQISOpenSearch class is a specialized client built on top of OpCatalogBase and is used for interacting with OpenSearch. The class uses opensearchpy to create a high-level interface for interacting with an OpenSearch cluster

It enables:

  • Connection Management which manages and retrieves connection credentials (host, port, login info) using a centralized Connection system inherited from OpCatalogBase.
  • Functionality such as, listing indexes, indexing documents and executing search queries is provided by opensearchpy.

Imports and Dependences

from typing import Dict, Any
from aqis.operations import OpCatalogBase, Connection
from opensearchpy import OpenSearch
  • Dict, Any: Dict and Any are types from Python’s typing module.
  • OpCatalogBase: The base class for managing multiple database connections.
  • OpenSearch: The OpenSearch class from the opensearch-py package.

Installation (including dependencies, assuming you are in a virtual environment with Python 3.12 or newer):

pip install dask opensearch-py
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-opensearch.git

Constructor:

class AQISOpenSearch(OpCatalogBase):
    def __init__(self, 
        os_connection_id: str = "opensearch_default", 
        connections: Dict[str, Connection] = {}) -> None:
Method Parameters Description
__init__ os_connection_id:str="opensearch_default"
connections:Dict[str, Connection] = {}
Initializes the OpenSearch interface with the given connection ID and optional connection dictionary.
def list_indexes(self) -> list[str]:
Method Parameters Description
list_indexes None Returns a list of all index names in the connected OpenSearch cluster.
def index_doc(self, 
    index: str, 
    doc: Dict[str, Any]) -> dict:
Method Parameters Description
index_doc index:str
doc:Dict[str, Any]
Indexes a single document (doc) into the specified OpenSearch index.
def search(self, 
        index: str, 
        query: Dict[str, Any]) -> list[dict]:
Method Parameters Description
search index:str
query:Dict[str, Any]
Executes a search query against the specified index and returns the hits.

PostgreSQL Adaptor

The AQISPostgreSQL class is a specialized client built on top of OpCatalogBase and is used for interacting with PostgreSQL. The class provides Dask-based access to PostgreSQL databases. The interface supports operations on a PostgreSQL databases using SQLAlchemy and Dask.

It enables:

  • Connection Management which manages and retrieves connection credentials (host, port, login info) using a centralized Connection system inherited from OpCatalogBase.
  • Standard database operations such as, reading from and writing to the database using Dask DataFrames and executing raw SQL statements using sqlalchemy.

Imports and Dependencies

from typing import Dict
from dask.dataframe import read_sql_query, to_sql
from dask.distributed import print
from sqlalchemy import TextClause
from pandas import DataFrame
import sqlalchemy
from aqis.operations import OpCatalogBase, Connection
  • Dict: Type annotations using dictionaries.
  • read_sql_query: Used to read data from SQL databases into a Dask DataFrame in parallel.
  • to_sql: Used to write a Dask DataFrame to a SQL table.
  • print: A special print function from dask.distributed.
  • TextClause: Represents a SQL text fragment or statement in SQLAlchemy.
  • DataFrame: Is a class from the pandas library.
  • sqlalchemy The entire SQLAlchemy library.
  • OpCatalogBase: The base class for managing multiple database connections.
  • Connection: An object representing connection details.

Installation (including dependencies, assuming you are in a virtual environment with Python 3.12 or newer):

pip install dask sqlalchemy psycopg2-binary
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-postgres.git

Constructor:

class AQISPostgreSQL(OpCatalogBase):
    def __init__(self, 
        db_connection_id: str = "db_default", 
        connections: Dict[str, Connection] = {}) -> None:
Method Parameters Description
__init__ db_connection_id:str="db_default"
connections:Dict[str,Connection]={}
Initializes the Postgres interface with the given connection ID and optional connection dictionary.
def read(self, 
    alchemysql: sqlalchemy.Select, 
    index_col: any, *args, print_dataframe=False, **kwargs) -> DataFrame:
Method Parameters Description
read alchemysql: sqlalchemy.Select
index_col: any
*args
print_dataframe: bool = False
**kwargs
Reads data from PostgreSQL using a SQLAlchemy Select object. Supports Dask’s parallelism. If print_dataframe is True, prints the resulting Dask DataFrame. Returns a Dask DataFrame.
def exec_sql(self, 
            sql: TextClause):
Method Parameters Description
exec_sql sql:TextClause Executes a raw SQL command against the PostgreSQL database.
def write(self, 
    dataframe: DataFrame, 
    name: str, 
    *args, **kwargs)->int|None:
Method Parameters Description
write dataframe:DataFrame
name:str
*args
**kwargs
Writes a Dask or Pandas DataFrame to a PostgreSQL table with the given name. Returns the number of rows written or None.

S3 Adaptor

The AQISS3 class is a specialized database client built on top of OpCatalogBase and is used for interacting with Minio.The class provides functionality to interact with an S3-compatible object storage like MinIO using boto3.

It enables:

  • Connection Management which manages and retrieves connection credentials (host, port, login info) using a centralized Connection system inherited from OpCatalogBase.
  • Functionality for file uploads, file downloads, and also listing operations using boto3.

Imports and Dependences:

from typing import Dict
from pandas import DataFrame
import boto3
import os
from aqis.operations import OpCatalogBase, Connection
  • Dict: Type annotations using dictionaries.
  • DataFrame: The Dataframe class from the Pandas library.
  • boto3: The boto3 library, which is the official AWS SDK for Python.
  • OpCatalogBase: The base class for managing multiple database connections.
  • Connection: The connection object which representing connection details.

Installation (including dependencies, assuming you are in a virtual environment with Python 3.12 or newer):

pip install dask boto3
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/aqis-engine-dask.git
pip install git+https://opencode.it4i.eu/exa4mind/platform/aqis/data-system-adaptors-dask/adaptor-s3.git

Constructor

class AQISS3(OpCatalogBase):
    def __init__(self, 
        s3_connection_id:str = "s3_minio", 
        connections: Dict[str, Connection] = {}) -> None:
Method Parameters Description
__init__ s3_connection_id:str="s3_minio"
connections:Dict[str, Connection] = {}
Initializes the AQISS3 object with credentials and bucket info from a Connection. Sets up the boto3 client.
def upload_file(self, 
    file_path: str, 
    object_name: str) -> None:
Method Parameters Description
upload_file file_path:str
object_name:str
Uploads a local file (file_path) to the S3 bucket using the specified object_name as the key.
def download_file(self, 
    object_name: str, 
    file_path: str) -> None:
Method Parameters Description
download_file object_name:str
file_path:str
Downloads a file from the S3 bucket (object_name) to a local path (file_path).
def list_files(self) -> list:
Method Parameters Description
list_files None Lists all object keys in the configured S3 bucket. Returns a list of file names.