import os
from django.db import connection
import pgconnection
from pgclone import command
from pgclone import database
from pgclone import exceptions
from pgclone import ls
from pgclone import settings
from pgclone.utils import is_valid_dump_key
from pgclone.utils import print_msg
def get_latest_dump_key(db_name):
"""
Given a database name, lists all of the dumps and finds the most
recent dump key.
"""
sorted_dumps = sorted(ls(db_name=db_name))
return sorted_dumps[-1] if sorted_dumps else None
def _db_exists(db):
"""Returns True if the database exists"""
conn_db = database.make_config('postgres')
db_url = database.get_url(conn_db)
try:
command.run_shell(
f'psql {db_url} -lqt | cut -d \\| -f 1 | grep -qw {db["NAME"]}'
)
return True
except RuntimeError:
return False
def _make_restore_config(*, pre_swap_hooks=None):
"""Make a pgclone restore config"""
return {'pre_swap_hooks': pre_swap_hooks or []}
def _get_restore_config(restore_config_name):
"""Get a pgclone restore config by its name"""
if restore_config_name not in settings.get_restore_configs():
raise exceptions.ConfigurationError(
f'"{restore_config_name}" is not a valid restore configuration'
' in settings.PGCLONE_RESTORE_CONFIGS.'
)
return _make_restore_config(
**settings.get_restore_configs()[restore_config_name]
)
def _kill_connections_to_database(db):
conn_db = database.make_config('postgres')
kill_connections_sql = f'''
SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = '{db["NAME"]}'
AND pid <> pg_backend_pid()
'''
command.run_psql(kill_connections_sql, db=conn_db)
def _drop_db(db):
conn_db = database.make_config('postgres')
_kill_connections_to_database(db)
drop_sql = f'DROP DATABASE IF EXISTS "{db["NAME"]}"'
command.run_psql(drop_sql, db=conn_db)
def _set_search_path(db, conn_db):
"""
pg_restore does not restore the original search_path variable of the
database, which can cause issues with extensions. This function obtains
the original search path and sets it as the search path of the restored
database
https://www.postgresql.org/message-id/CAKFQuwbWpd1DXA_YY3zXRKGKKLL1ZTi9MHDQ15zJ8ufVBwVJEA%40mail.gmail.com
"""
with connection.cursor() as cursor:
cursor.execute('SHOW search_path;')
search_path = cursor.fetchone()[0]
set_search_path_sql = (
f'ALTER DATABASE "{db["NAME"]}"' f' SET search_path to {search_path}'
)
command.run_psql(set_search_path_sql, db=conn_db)
def _local_restore(db_name_or_dump_key, *, conn_db, temp_db, curr_db, prev_db):
"""
Performs a restore using a local database
"""
if db_name_or_dump_key == ':current':
local_restore_db = curr_db
elif db_name_or_dump_key == ':previous':
local_restore_db = prev_db
else:
local_restore_db = database.make_config(db_name_or_dump_key[1:])
if not _db_exists(local_restore_db):
raise RuntimeError(
f'local database {local_restore_db["NAME"]} does not exist.'
' Use "pgclone ls --local" to see local database keys.'
)
# Perform the local restore process. It is completely valid to
# try to restore the temp_db, so do nothing if this database
# is provided by the user
if local_restore_db != temp_db: # pragma: no branch
_drop_db(temp_db)
create_temp_sql = f'''
CREATE DATABASE {temp_db["NAME"]}
TEMPLATE {local_restore_db["NAME"]}
'''
command.run_psql(create_temp_sql, db=conn_db)
_set_search_path(temp_db, conn_db)
return db_name_or_dump_key
def _remote_restore(db_name_or_dump_key, *, temp_db, conn_db):
# We are restoring from a remote dump. If the dump key is not valid,
# assume it is the latest dump of a database name and get the latest
# dump key
dump_key = db_name_or_dump_key
if not is_valid_dump_key(dump_key):
dump_key = get_latest_dump_key(db_name_or_dump_key)
if not dump_key:
raise RuntimeError(
'Could not find a dump for database name'
f' "{db_name_or_dump_key}"'
)
storage_location = settings.get_storage_location()
file_path = os.path.join(storage_location, dump_key)
print_msg(f'Creating the temporary restore db')
_drop_db(temp_db)
create_temp_sql = f'CREATE DATABASE "{temp_db["NAME"]}"'
command.run_psql(create_temp_sql, db=conn_db)
_set_search_path(temp_db, conn_db)
print_msg(f'Running pg_restore on "{dump_key}"')
pg_restore_cmd = (
'pg_restore --verbose --no-acl --no-owner '
f'-d \'{database.get_url(temp_db)}\''
)
# When restoring, we need to ignore errors because there are certain
# errors we cannot get around when pg restoring some DBs (like Aurora).
# In the future, we may parse the output of the pg_restore command to see
# if an unexpected error happened.
if storage_location.startswith('s3://'): # pragma: no cover
command.run_shell(
f'aws s3 cp {file_path} - | {pg_restore_cmd}', ignore_errors=True
)
else:
command.run_shell(f'{pg_restore_cmd} {file_path}', ignore_errors=True)
return dump_key
[docs]def restore(
db_name_or_dump_key,
pre_swap_hooks=None,
restore_config_name=None,
reversible=False,
):
"""
Restores a database dump
Args:
db_name_or_dump_key (str): When a database name
is provided, finds the most recent dump for that
database and restores it. Restores a specific dump
when a dump key is provided.
pre_swap_hooks (List[str], default=None): The list of pre_swap hooks to
run before swapping the temp restore database with the
main database. The strings are management command names.
restore_config_name (str, default=None): The name of the restore
config to use when running the restore. Configured
in settings.PGCLONE_RESTORE_CONFIGS
Returns:
str: The dump key that was restored.
"""
settings.validate()
if not settings.get_allow_restore(): # pragma: no cover
raise RuntimeError('Restore not allowed')
if restore_config_name and pre_swap_hooks: # pragma: no cover
raise ValueError(
'Cannot pass in pre_swap_hooks when using a restore config'
)
elif pre_swap_hooks:
restore_config_name = '_custom'
restore_config = _make_restore_config(pre_swap_hooks=pre_swap_hooks)
else:
restore_config_name = restore_config_name or 'default'
restore_config = _get_restore_config(restore_config_name)
# Restore works in the following steps with the following databases:
# 1. Create the temp_db database to perform the restore without
# affecting the default_db
# 2. Call pg_restore on temp_db
# 3. Apply any pre_swap_hooks to temp_db
# 4. Terminate all connections on default_db so that we
# can swap in the temp_db
# 4. Rename default_db to prev_db
# 5. Rename temp_db to default_db
# 6. Delete prev_db
#
# Database variable names below reflect this process. We use the
# "postgres" database (i.e. conn_db) as the connection database
# when using psql for most of these commands
default_db = database.get_default_config()
temp_db = database.make_config(default_db['NAME'] + '__temp')
prev_db = database.make_config(default_db['NAME'] + '__prev')
curr_db = database.make_config(default_db['NAME'] + '__curr')
conn_db = database.make_config('postgres')
local_restore_db = None
if db_name_or_dump_key.startswith(':'):
dump_key = _local_restore(
db_name_or_dump_key,
conn_db=conn_db,
temp_db=temp_db,
curr_db=curr_db,
prev_db=prev_db,
)
else:
dump_key = _remote_restore(
db_name_or_dump_key, temp_db=temp_db, conn_db=conn_db
)
# When in reversible mode, make a special __curr db snapshot.
# Avoid this if we are restoring the current db
if reversible and local_restore_db != curr_db:
print_msg('Creating snapshot for reversible restore')
_drop_db(curr_db)
create_current_db_sql = f'''
CREATE DATABASE "{curr_db['NAME']}"
WITH TEMPLATE
"{temp_db['NAME']}"
'''
command.run_psql(create_current_db_sql, db=conn_db)
# pre-swap hook step
with pgconnection.route(temp_db):
for management_command_name in restore_config['pre_swap_hooks']:
print_msg(
f'Running "manage.py {management_command_name}" pre_swap hook'
)
command.run_management(management_command_name)
# swap step
print_msg('Swapping the restored copy with the primary database')
_drop_db(prev_db)
_kill_connections_to_database(default_db)
alter_db_sql = f'''
ALTER DATABASE "{default_db['NAME']}" RENAME TO
"{prev_db['NAME']}"
'''
# There's a scenario where the default DB may not exist before running
# this, so just ignore errors on this command
command.run_psql(alter_db_sql, db=conn_db, ignore_errors=True)
rename_sql = f'''
ALTER DATABASE "{temp_db["NAME"]}"
RENAME TO "{default_db["NAME"]}"
'''
command.run_psql(rename_sql, db=conn_db)
if not reversible:
print_msg('Cleaning old pgclone resources')
_drop_db(curr_db)
_drop_db(prev_db)
print_msg(f'Successfully restored dump "{dump_key}"')
return dump_key