File Operations#

Example script intended to be run from the command line, where it will perform the specified file operations based on the provided arguments.

Example usage: python examples/file_operations_client.py --local-path=examples/basic/files/* --remote-path=hello --debug

import glob
import logging
import os
from pathlib import Path
from typing import Optional

import typer
from typing_extensions import Annotated

from ansys.hps.data_transfer.client import Client, DataTransferApi
from ansys.hps.data_transfer.client.authenticate import authenticate
from ansys.hps.data_transfer.client.models.msg import SrcDst, StoragePath

log = logging.getLogger(__name__)


def run(api: DataTransferApi, local_path: str, remote_path: Optional[str] = None):
    if not remote_path:
        remote_path = Path(local_path).parent.name

    log.info("Query storages ...")
    storages = api.storages()
    storage_names = [f"{storage['name']}({storage['type']})" for storage in storages]
    log.info(f"Available storages: {storage_names}")

    log.info("Creating a directory ...")
    base_dir = "basic-example"
    mkdir_op = api.mkdir([StoragePath(path=f"{base_dir}")])
    api.wait_for([mkdir_op.id])

    log.info("Copying files ...")
    files = glob.glob(os.path.join(os.path.dirname(__file__), "files", "*.txt"))
    srcs = [StoragePath(path=file, remote="local") for file in files]
    dsts = [StoragePath(path=f"{base_dir}/{os.path.basename(file)}") for file in files]

    op = api.copy([SrcDst(src=src, dst=dst) for src, dst in zip(srcs, dsts)])
    op = api.wait_for([op.id])
    log.info(f"Operation {op[0].state}")

    ####################################
    # Listing files and getting metadata
    # ==================================

    log.info("Listing files ...")
    op = api.list([StoragePath(path=base_dir)])
    op = api.wait_for([op.id])
    log.info(f"Operation {op[0].state}")
    log.info(f"Files in {base_dir}: {op[0].result}")

    log.info("Getting metadata ...")
    op = api.get_metadata([StoragePath(path=f"{base_dir}/2.txt")])
    op = api.wait_for(op.id)
    md = op[0].result[f"{base_dir}/2.txt"]
    log.info(f"Metadata for {base_dir}/2.txt: {md}")

    log.info("Removing files ...")
    op = api.rmdir([StoragePath(path=base_dir)])
    op = api.wait_for([op.id])
    log.info(f"Operation {op[0].state}")


def main(
    local_path: Annotated[str, typer.Option(help="Path to the files or directory to transfer. Supports wildcards")],
    remote_path: Annotated[str, typer.Option(help="Optional path to the remote directory to transfer files to")] = None,
    debug: Annotated[bool, typer.Option(help="Enable debug logging")] = False,
    url: Annotated[str, typer.Option(help="HPS URL to connect to")] = "https://localhost:8443/hps",
    username: Annotated[str, typer.Option(help="Username to authenticate with")] = "repadmin",
    password: Annotated[
        str, typer.Option(prompt=True, hide_input=True, help="Password to authenticate with")
    ] = "repadmin",
):

    logging.basicConfig(
        format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG if debug else logging.INFO
    )

    dt_url = f"{url}/dt/api/v1"
    auth_url = f"{url}/auth/realms/rep"

    token = authenticate(username=username, password=password, verify=False, url=auth_url)
    token = token.get("access_token", None)
    assert token is not None

    log.info("Connecting to the data transfer service client..")
    client = Client(clean=True)

    client.binary_config.update(
        verbosity=3,
        debug=False,
        insecure=True,
        token=token,
        data_transfer_url=dt_url,
    )
    client.start()

    api = DataTransferApi(client)
    api.status(wait=True)
    storage_names = [f"{s['name']}({s['type']})" for s in api.storages()]
    log.info(f"Available storages: {storage_names}")

    run(api=api, local_path=local_path, remote_path=remote_path)

    client.stop()


if __name__ == "__main__":
    typer.run(main)