Source code for pgclone.ls_cmd

import os
import subprocess

import boto3

from pgclone import database
from pgclone import settings
from pgclone.utils import is_valid_dump_key


def _get_relative_path(storage_location, path):
    """
    Given a storage location and an absolute path, return
    the relative path (i.e. the dump key)
    """
    prefix_len = len(storage_location)
    return path[prefix_len:]


[docs]def ls(db_name=None, only_db_names=False, local=False): """ Lists dump keys. Args: db_name (str, default=None): List dump keys only for this database name only_db_names (boolean, default=False): Lists only the unique database names associated with the dump keys local (bool, default=False): Only list local restore keys. Returns: List[str]: The list of dump keys (or database names). """ settings.validate() storage_location = settings.get_storage_location() conn_db_url = database.get_url(database.make_config('postgres')) if local: cmd = f'psql {conn_db_url} -lqt | cut -d \\| -f 1' stdout = subprocess.run( cmd, shell=True, stdout=subprocess.PIPE ).stdout.decode() return [ f':{db_name.strip()}' for db_name in stdout.split('\n') if db_name.strip() ] if storage_location.startswith('s3://'): # pragma: no cover bucket_name, prefix = storage_location[5:].split('/', 2) bucket = boto3.resource('s3').Bucket(bucket_name) abs_paths = [ f's3://{obj.bucket_name}/{obj.key}' for obj in bucket.objects.filter(Prefix=prefix) if not obj.key.endswith('/') ] else: abs_paths = [ os.path.join(dirpath, file_name) for dirpath, _, file_names in os.walk(storage_location) for file_name in file_names ] # Return paths relative to the storage location dump_keys = [ _get_relative_path(storage_location, path) for path in abs_paths if is_valid_dump_key(_get_relative_path(storage_location, path)) ] if db_name: dump_keys = [ dump_key for dump_key in dump_keys if dump_key.startswith(f'{db_name}/') ] if only_db_names: return sorted( list({dump_key.split('/', 1)[0] for dump_key in dump_keys}) ) else: return sorted(dump_keys, reverse=True)