Note
Go to the end to download the full example code.
Set and query permissions#
Example script to set and query permissions on files and directories using the data transfer service.
Example usage:
python examples/set_permissions_example.py --local-path=examples/basic/files/hello
import glob
import logging
import os
import pathlib
import traceback
from keycloak import KeycloakAdmin
import typer
from typing_extensions import Annotated
from ansys.hps.data_transfer.client import Client, DataTransferApi
from ansys.hps.data_transfer.client.authenticate import authenticate
from ansys.hps.data_transfer.client.models.msg import SrcDst, StoragePath
from ansys.hps.data_transfer.client.models.permissions import (
Resource,
ResourceType,
RoleAssignment,
RoleType,
Subject,
SubjectType,
)
stream_formatter = logging.Formatter("[%(asctime)s/%(levelname)5.5s] %(message)s", datefmt="%H:%M:%S")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(stream_formatter)
stream_handler.setLevel(logging.DEBUG)
log = logging.getLogger("ansys.hps")
log.addHandler(stream_handler)
log.setLevel(logging.DEBUG)
def get_user_id_from_keycloak(keycloak_url):
admin = KeycloakAdmin(
server_url=keycloak_url + "/",
username="keycloak",
password="keycloak123",
realm_name="rep",
user_realm_name="master",
verify=False,
)
user_id = admin.get_user_id("repuser")
return user_id
def permissions(api: DataTransferApi, url: str):
keycloak_url = f"{url}/auth"
auth_url = f"{keycloak_url}/realms/rep"
dt_url = f"{url}/dt/api/v1"
log.info("### Checking binary's status ...")
status = api.status(wait=True)
log.info(f"### Client binary status: {status}")
source_dir = pathlib.Path(__file__).parent / "files"
source_dir = os.path.relpath(source_dir, os.getcwd())
log.info(f"### Searching for files to copy in {source_dir} ...")
files = glob.glob(os.path.join(source_dir, "**", "*.*"), recursive=True)
log.info("Found files:")
for file in files:
log.info(f"- {file}")
log.info("### Trying to copy files as 'repuser'...")
target_dir = "permissions_demo"
src_dst = [
SrcDst(
src=StoragePath(path=f, remote="local"),
dst=StoragePath(
path=os.path.normpath(
os.path.join(target_dir, os.path.relpath(os.path.dirname(f), source_dir), os.path.basename(f))
),
remote="any",
),
)
for f in files
]
log.info("### Operations to be executed: ")
for sd in src_dst:
log.info(f"- {sd.src.remote}:{sd.src.path} -> {sd.dst.remote}:{sd.dst.path}")
try:
# The operation will fail because 'repuser' does not have the necessary permissions
resp = api.copy(src_dst)
except Exception as ex:
log.error(f"Encountered error: {ex}")
admin_token = authenticate(username="repadmin", password="repadmin", verify=False, url=auth_url)
admin_token = admin_token.get("access_token", None)
log.info("### Preparing data transfer client for 'repadmin' ...")
admin_client = Client()
admin_client.binary_config.update(
verbosity=3,
debug=False,
insecure=True,
token=admin_token,
data_transfer_url=dt_url,
)
admin_client.start()
###################################
# Create a DataTransferApi instance
# =================================
admin = DataTransferApi(admin_client)
admin.status(wait=True)
log.info("### Granting 'repuser' the necessary permissions ...")
user_id = get_user_id_from_keycloak(keycloak_url)
try:
admin.set_permissions(
[
RoleAssignment(
resource=Resource(path=target_dir, type=ResourceType.Doc),
role=RoleType.Writer,
subject=Subject(id=user_id, type=SubjectType.User),
)
]
)
except Exception as ex:
log.info(ex)
try:
log.info("### Verifying permissions for 'repuser' ...")
resp = admin.check_permissions(
[
RoleAssignment(
resource=Resource(path=target_dir, type=ResourceType.Doc),
role=RoleType.Writer,
subject=Subject(id=user_id, type=SubjectType.User),
)
]
)
log.info(f"### Is 'repuser'({user_id}) allowed to read from {target_dir}? -> {resp.allowed}")
log.info("### Trying to copy files as 'repuser' one more time...")
resp = api.copy(src_dst)
op = api.wait_for([resp.id], timeout=None)[0]
log.info(f"### Copy operation state: {op.state}")
log.info("### Listing files in the target directory as 'repadmin' ...")
resp = admin.list([StoragePath(path=target_dir, remote="any")])
op = admin.wait_for([resp.id], timeout=10)[0]
log.info(f"### Result: {op.result}")
target_dir = os.path.join(os.path.dirname(__file__), "downloaded")
log.info(f"### Downloading files to {target_dir}...")
src_dst = [
SrcDst(
src=sd.dst,
dst=StoragePath(remote="local", path=os.path.join(target_dir, os.path.basename(sd.dst.path))),
)
for sd in src_dst
]
resp = admin.copy(src_dst)
op = admin.wait_for([resp.id], timeout=300)[0]
log.info(f"### Copy operation state: {op.state}")
except Exception as ex:
log.debug(traceback.format_exc())
log.error(f"Error: {ex}")
finally:
log.info("Removing permissions for 'repuser' ...")
admin.remove_permissions(
[
RoleAssignment(
resource=Resource(path=target_dir, type=ResourceType.Doc),
role="writer",
subject=Subject(id=user_id, type=SubjectType.User),
)
]
)
log.info("And that's all folks!")
admin_client.stop()
def main(
debug: Annotated[bool, typer.Option(help="Enable debug logging")] = False,
url: Annotated[str, typer.Option(help="HPS URL to connect to")] = "https://localhost:8443/hps",
username: Annotated[str, typer.Option(help="Username to authenticate with")] = "repadmin",
password: Annotated[
str, typer.Option(prompt=True, hide_input=True, help="Password to authenticate with")
] = "repadmin",
):
logging.basicConfig(
format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG if debug else logging.INFO
)
dt_url = f"{url}/dt/api/v1"
auth_url = f"{url}/auth/realms/rep"
token = authenticate(username=username, password=password, verify=False, url=auth_url)
token = token.get("access_token", None)
assert token is not None
log.info("Connecting to the data transfer service client..")
client = Client(clean=True)
client.binary_config.update(
verbosity=3,
debug=False,
insecure=True,
token=token,
data_transfer_url=dt_url,
)
client.start()
api = DataTransferApi(client)
api.status(wait=True)
storage_names = [f"{s['name']}({s['type']})" for s in api.storages()]
log.info(f"Available storages: {storage_names}")
permissions(api=api, url=url)
client.stop()
if __name__ == "__main__":
typer.run(main)