From 7d5a417a6a56c3740f99f7cd1c9ad10fef81ad67 Mon Sep 17 00:00:00 2001 From: badra001 Date: Fri, 27 Feb 2026 15:51:15 -0500 Subject: [PATCH] rename --- .../route53-migration/delete_zone.py | 395 ++++++++++++++++++ .../route53-migration/describe_zone.py | 235 +++++++++++ .../route53-migration/disassociate_vpcs.py | 241 +++++++++++ .../route53-migration/example.txt | 99 +++++ .../route53-migration/import_records.py | 250 +++++++++++ .../route53-migration/list_records.py | 158 +++++++ 6 files changed, 1378 insertions(+) create mode 100755 local-app/python-tools/route53-migration/delete_zone.py create mode 100755 local-app/python-tools/route53-migration/describe_zone.py create mode 100755 local-app/python-tools/route53-migration/disassociate_vpcs.py create mode 100644 local-app/python-tools/route53-migration/example.txt create mode 100755 local-app/python-tools/route53-migration/import_records.py create mode 100755 local-app/python-tools/route53-migration/list_records.py diff --git a/local-app/python-tools/route53-migration/delete_zone.py b/local-app/python-tools/route53-migration/delete_zone.py new file mode 100755 index 00000000..1f072ce2 --- /dev/null +++ b/local-app/python-tools/route53-migration/delete_zone.py @@ -0,0 +1,395 @@ +#!/usr/bin/env python3 +""" +delete_zone.py v1.0.0 + +Remove all deletable resource records from a Route 53 hosted zone, +then optionally delete the zone itself. + +SOA and apex NS records are skipped — AWS requires them to be present +and will reject attempts to delete them. + +Safety backups of all records and zone metadata are always written +before any destructive action is taken. + +Usage: + # Dry run — show what would be deleted, write backups + python delete_zone.py --zone-id Z1234567890ABC --profile source-account --dry-run + + # Empty the zone (delete records, keep zone) + python delete_zone.py --zone-id Z1234567890ABC --profile source-account + + # Empty and delete the zone + python delete_zone.py --zone-id Z1234567890ABC --profile source-account --delete-zone + + # Non-interactive (no confirmation prompts) + python delete_zone.py --zone-id Z1234567890ABC --profile source-account --delete-zone --yes +""" + +import argparse +import boto3 +import json +import os +import sys +import time +from datetime import datetime, timezone +from typing import Optional + +VERSION = "1.0.0" + +SKIP_TYPES = {'SOA', 'NS'} +BATCH_SIZE = 100 + + +def get_client(profile: Optional[str], region: str, service: str = 'route53'): + session = boto3.Session(profile_name=profile, region_name=region) + return session.client(service) + + +def get_account_id(profile: Optional[str]) -> str: + try: + session = boto3.Session(profile_name=profile) + return session.client('sts').get_caller_identity()['Account'] + except Exception as e: + return f"UNKNOWN ({e})" + + +def get_zone_details(client, zone_id: str) -> dict: + try: + response = client.get_hosted_zone(Id=zone_id) + except client.exceptions.NoSuchHostedZone: + print(f"ERROR: Hosted zone '{zone_id}' not found.", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"ERROR fetching zone details: {e}", file=sys.stderr) + sys.exit(1) + + return { + 'HostedZone': response['HostedZone'], + 'DelegationSet': response.get('DelegationSet', {}), + 'VPCs': response.get('VPCs', []), + } + + +def get_all_records(client, zone_id: str) -> list: + records = [] + paginator = client.get_paginator('list_resource_record_sets') + for page in paginator.paginate(HostedZoneId=zone_id): + records.extend(page['ResourceRecordSets']) + return records + + +def get_tags(client, zone_id: str) -> dict: + try: + response = client.list_tags_for_resource( + ResourceType='hostedzone', + ResourceId=zone_id + ) + return {tag['Key']: tag['Value'] for tag in response['ResourceTagSet']['Tags']} + except Exception as e: + print(f"WARNING: Could not fetch tags: {e}", file=sys.stderr) + return {} + + +# --------------------------------------------------------------------------- +# Safety backups +# --------------------------------------------------------------------------- + +def backup_filename(zone_id: str, suffix: str, output_dir: str) -> str: + ts = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ') + return os.path.join(output_dir, f"{zone_id}_{suffix}_{ts}.json") + + +def write_records_backup(records: list, zone_id: str, output_dir: str) -> str: + path = backup_filename(zone_id, 'records', output_dir) + data = { + 'SchemaVersion': '1.0', + 'ToolVersion': VERSION, + 'HostedZoneId': zone_id, + 'RecordCount': len(records), + 'ExportedAt': datetime.now(timezone.utc).isoformat(), + 'ResourceRecordSets': records, + } + with open(path, 'w') as f: + json.dump(data, f, indent=2, default=str) + print(f" Records backup : {path}") + return path + + +def write_zone_backup(details: dict, tags: dict, account_id: str, + zone_id: str, output_dir: str) -> str: + path = backup_filename(zone_id, 'zone', output_dir) + hz = details['HostedZone'] + config = hz.get('Config', {}) + delegation = details.get('DelegationSet', {}) + + data = { + 'SchemaVersion': '1.0', + 'ToolVersion': VERSION, + 'AccountId': account_id, + 'ExportedAt': datetime.now(timezone.utc).isoformat(), + 'HostedZone': { + 'Id': zone_id, + 'Name': hz.get('Name', ''), + 'Comment': config.get('Comment', ''), + 'PrivateZone': config.get('PrivateZone', False), + 'CallerReference': hz.get('CallerReference', ''), + 'ResourceRecordSetCount': hz.get('ResourceRecordSetCount', 'N/A'), + }, + 'DelegationSet': { + 'Id': delegation.get('Id', ''), + 'CallerReference': delegation.get('CallerReference', ''), + 'NameServers': delegation.get('NameServers', []), + }, + 'AssociatedVPCs': [ + {'VPCId': v.get('VPCId', ''), 'VPCRegion': v.get('VPCRegion', '')} + for v in details.get('VPCs', []) + ], + 'Tags': tags, + } + with open(path, 'w') as f: + json.dump(data, f, indent=2, default=str) + print(f" Zone backup : {path}") + return path + + +# --------------------------------------------------------------------------- +# Record deletion +# --------------------------------------------------------------------------- + +def partition_records(records: list, zone_name: str) -> tuple[list, list]: + """ + Split records into deletable and skipped (SOA + apex NS). + zone_name should be the fully-qualified zone apex (e.g. 'example.com.'). + """ + deletable = [] + skipped = [] + + for record in records: + rtype = record['Type'] + rname = record['Name'] + + # Always skip SOA + if rtype == 'SOA': + skipped.append(record) + continue + + # Skip apex NS (AWS-managed); non-apex NS delegations are deletable + if rtype == 'NS' and rname == zone_name: + skipped.append(record) + continue + + deletable.append(record) + + return deletable, skipped + + +def build_delete_batch(records: list) -> dict: + return { + 'Changes': [ + {'Action': 'DELETE', 'ResourceRecordSet': r} + for r in records + ] + } + + +def wait_for_change(client, change_id: str, timeout: int = 120): + print(f" Waiting for change {change_id}", end='', flush=True) + deadline = time.time() + timeout + while time.time() < deadline: + status = client.get_change(Id=change_id)['ChangeInfo']['Status'] + if status == 'INSYNC': + print(" done.") + return + print('.', end='', flush=True) + time.sleep(5) + print(f"\n WARNING: Change {change_id} did not reach INSYNC within {timeout}s.") + + +def delete_records(client, zone_id: str, records: list, dry_run: bool) -> dict: + results = {'succeeded': [], 'failed': []} + total = len(records) + + if total == 0: + print("\n No deletable records found.") + return results + + print(f"\n Deleting {total} records in batches of {BATCH_SIZE}...") + + for i in range(0, total, BATCH_SIZE): + chunk = records[i:i + BATCH_SIZE] + batch_num = (i // BATCH_SIZE) + 1 + print(f"\n Batch {batch_num} ({len(chunk)} records):") + for r in chunk: + print(f" DELETE {r['Type']:<8} {r['Name']}") + + if dry_run: + print(f" [DRY RUN] Skipping actual deletion.") + results['succeeded'].extend([r['Name'] for r in chunk]) + continue + + try: + batch = build_delete_batch(chunk) + response = client.change_resource_record_sets( + HostedZoneId=zone_id, + ChangeBatch=batch + ) + change_id = response['ChangeInfo']['Id'].split('/')[-1] + wait_for_change(client, change_id) + results['succeeded'].extend([r['Name'] for r in chunk]) + except Exception as e: + print(f" ERROR in batch {batch_num}: {e}", file=sys.stderr) + results['failed'].extend([r['Name'] for r in chunk]) + + return results + + +# --------------------------------------------------------------------------- +# Zone deletion +# --------------------------------------------------------------------------- + +def delete_hosted_zone(client, zone_id: str, dry_run: bool) -> bool: + if dry_run: + print(f"\n [DRY RUN] Would delete hosted zone {zone_id}.") + return True + try: + client.delete_hosted_zone(Id=zone_id) + print(f"\n Hosted zone {zone_id} deleted successfully.") + return True + except client.exceptions.HostedZoneNotEmpty as e: + print(f"\n ERROR: Zone is not empty — cannot delete: {e}", file=sys.stderr) + return False + except Exception as e: + print(f"\n ERROR deleting zone: {e}", file=sys.stderr) + return False + + +# --------------------------------------------------------------------------- +# Confirmation prompt +# --------------------------------------------------------------------------- + +def confirm(prompt: str) -> bool: + while True: + answer = input(prompt + " [yes/no]: ").strip().lower() + if answer == 'yes': + return True + elif answer == 'no': + return False + print(" Please type 'yes' or 'no'.") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description=f'Empty a Route 53 hosted zone and optionally delete it. v{VERSION}' + ) + parser.add_argument('--zone-id', required=True, help='Hosted Zone ID (e.g., Z1234567890ABC)') + parser.add_argument('--profile', help='AWS profile name') + parser.add_argument('--region', default='us-east-1', help='AWS region (default: us-east-1)') + parser.add_argument('--delete-zone', action='store_true', + help='Delete the hosted zone after emptying it') + parser.add_argument('--dry-run', action='store_true', + help='Show what would happen without making any changes') + parser.add_argument('--yes', action='store_true', + help='Skip confirmation prompts') + parser.add_argument('--output-dir', default='.', + help='Directory for backup JSON files (default: current directory)') + args = parser.parse_args() + + zone_id = args.zone_id.split('/')[-1] + client = get_client(args.profile, args.region) + account_id = get_account_id(args.profile) + details = get_zone_details(client, zone_id) + tags = get_tags(client, zone_id) + records = get_all_records(client, zone_id) + + hz = details['HostedZone'] + zone_name = hz.get('Name', '') + is_private = hz.get('Config', {}).get('PrivateZone', False) + + print(f"\ndelete_zone.py v{VERSION}") + print("=" * 60) + print("ZONE DELETION PLAN") + print("=" * 60) + print(f"\n{'Zone ID':<22} {zone_id}") + print(f"{'Zone Name':<22} {zone_name}") + print(f"{'Account ID':<22} {account_id}") + print(f"{'Private Zone':<22} {is_private}") + print(f"{'Total Records':<22} {len(records)}") + + deletable, skipped = partition_records(records, zone_name) + print(f"{'Deletable Records':<22} {len(deletable)}") + print(f"{'Skipped (SOA/NS)':<22} {len(skipped)}") + print(f"{'Delete Zone':<22} {args.delete_zone}") + print(f"{'Dry Run':<22} {args.dry_run}") + + # Always write safety backups before touching anything + print("\n--- Safety Backups " + "-" * 40) + if args.dry_run: + print(" [DRY RUN] Backups would be written here:") + print(f" Records backup : {backup_filename(zone_id, 'records', args.output_dir)}") + print(f" Zone backup : {backup_filename(zone_id, 'zone', args.output_dir)}") + else: + os.makedirs(args.output_dir, exist_ok=True) + write_records_backup(records, zone_id, args.output_dir) + write_zone_backup(details, tags, account_id, zone_id, args.output_dir) + + # Confirm record deletion + print("\n--- Record Deletion " + "-" * 40) + if deletable: + for r in sorted(deletable, key=lambda x: (x['Name'], x['Type'])): + print(f" DELETE {r['Type']:<8} {r['Name']}") + else: + print(" Nothing to delete.") + + if skipped: + print(f"\n Skipping (AWS-managed):") + for r in skipped: + print(f" KEEP {r['Type']:<8} {r['Name']}") + + if deletable: + if not args.dry_run and not args.yes: + if not confirm(f"\nDelete {len(deletable)} records from zone {zone_id}?"): + print("Aborted.") + sys.exit(0) + + delete_results = delete_records(client, zone_id, deletable, args.dry_run) + else: + delete_results = {'succeeded': [], 'failed': []} + + # Confirm zone deletion + if args.delete_zone: + print("\n--- Zone Deletion " + "-" * 42) + if delete_results['failed']: + print(" WARNING: Some record deletions failed. Skipping zone deletion to avoid data loss.") + else: + if not args.dry_run and not args.yes: + if not confirm(f"Permanently delete hosted zone {zone_id} ({zone_name})?"): + print(" Zone deletion skipped.") + else: + delete_hosted_zone(client, zone_id, args.dry_run) + else: + delete_hosted_zone(client, zone_id, args.dry_run) + + # Summary + print("\n--- Summary " + "-" * 47) + if args.dry_run: + print(f" Would delete records : {len(deletable)}") + print(f" Would keep (SOA/NS) : {len(skipped)}") + if args.delete_zone: + print(f" Would delete zone : {zone_id}") + else: + print(f" Records succeeded : {len(delete_results['succeeded'])}") + print(f" Records failed : {len(delete_results['failed'])}") + print(f" Kept (SOA/NS) : {len(skipped)}") + if delete_results['failed']: + print(f"\n Failed records:") + for name in delete_results['failed']: + print(f" {name}") + print() + + +if __name__ == '__main__': + main() diff --git a/local-app/python-tools/route53-migration/describe_zone.py b/local-app/python-tools/route53-migration/describe_zone.py new file mode 100755 index 00000000..b324f13a --- /dev/null +++ b/local-app/python-tools/route53-migration/describe_zone.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +""" +describe_zone.py v1.0.0 + +Describe a Route 53 hosted zone in detail: metadata, record count, +delegation set, associated VPCs, tags, and DNSSEC status. + +Usage: + python describe_zone.py --zone-id Z1234567890ABC [--profile myprofile] [--output-json zone_info.json] +""" + +import argparse +import boto3 +import json +import sys +from typing import Optional + +VERSION = "1.0.0" + + +def get_client(profile: Optional[str], region: str): + session = boto3.Session(profile_name=profile, region_name=region) + return session.client('route53') + + +def get_zone_details(client, zone_id: str) -> dict: + """Fetch core hosted zone metadata.""" + try: + response = client.get_hosted_zone(Id=zone_id) + except client.exceptions.NoSuchHostedZone: + print(f"ERROR: Hosted zone '{zone_id}' not found.", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"ERROR fetching zone details: {e}", file=sys.stderr) + sys.exit(1) + + return { + 'HostedZone': response['HostedZone'], + 'DelegationSet': response.get('DelegationSet', {}), + 'VPCs': response.get('VPCs', []), + } + + +def get_record_count(client, zone_id: str) -> int: + """ + Return the exact record count by paginating all RRs. + The ResourceRecordSetCount in GetHostedZone is an estimate and can lag. + """ + count = 0 + paginator = client.get_paginator('list_resource_record_sets') + for page in paginator.paginate(HostedZoneId=zone_id): + count += len(page['ResourceRecordSets']) + return count + + +def get_tags(client, zone_id: str) -> dict: + """Fetch tags for the hosted zone.""" + try: + response = client.list_tags_for_resource( + ResourceType='hostedzone', + ResourceId=zone_id + ) + return {tag['Key']: tag['Value'] for tag in response['ResourceTagSet']['Tags']} + except Exception as e: + print(f"WARNING: Could not fetch tags: {e}", file=sys.stderr) + return {} + + +def get_dnssec_status(client, zone_id: str) -> dict: + """Fetch DNSSEC signing status (public zones only).""" + try: + response = client.get_dnssec(HostedZoneId=zone_id) + return { + 'Status': response['Status'].get('ServeSignature', 'UNKNOWN'), + 'KeySigningKeys': [ + { + 'Name': ksk['Name'], + 'Status': ksk['Status'], + 'Algorithm': ksk.get('SigningAlgorithmMnemonic', ''), + 'CreatedOn': str(ksk.get('CreatedDate', '')), + } + for ksk in response.get('KeySigningKeys', []) + ] + } + except client.exceptions.NoSuchHostedZone: + return {'Status': 'NOT_APPLICABLE', 'KeySigningKeys': []} + except Exception: + # Private zones and GovCloud may not support DNSSEC + return {'Status': 'NOT_APPLICABLE', 'KeySigningKeys': []} + + +def get_account_id(profile: Optional[str]) -> str: + """Resolve the AWS account ID for the active credentials.""" + try: + session = boto3.Session(profile_name=profile) + sts = session.client('sts') + return sts.get_caller_identity()['Account'] + except Exception as e: + return f"UNKNOWN ({e})" + + +def build_report(zone_id: str, details: dict, record_count: int, tags: dict, + dnssec: dict, account_id: str) -> dict: + """Assemble all collected data into a structured report dict.""" + hz = details['HostedZone'] + config = hz.get('Config', {}) + delegation = details.get('DelegationSet', {}) + vpcs = details.get('VPCs', []) + + return { + 'SchemaVersion': '1.0', + 'ToolVersion': VERSION, + 'AccountId': account_id, + 'HostedZone': { + 'Id': zone_id, + 'Name': hz.get('Name', ''), + 'Comment': config.get('Comment', ''), + 'PrivateZone': config.get('PrivateZone', False), + 'CallerReference': hz.get('CallerReference', ''), + }, + 'RecordCount': { + 'Exact': record_count, + 'Estimate': hz.get('ResourceRecordSetCount', 'N/A'), + }, + 'DelegationSet': { + 'Id': delegation.get('Id', ''), + 'CallerReference': delegation.get('CallerReference', ''), + 'NameServers': delegation.get('NameServers', []), + }, + 'AssociatedVPCs': [ + {'VPCId': vpc.get('VPCId', ''), 'VPCRegion': vpc.get('VPCRegion', '')} + for vpc in vpcs + ], + 'Tags': tags, + 'DNSSEC': dnssec, + } + + +def print_report(report: dict): + """Print a human-readable summary of the zone report.""" + hz = report['HostedZone'] + rc = report['RecordCount'] + ds = report['DelegationSet'] + vpcs = report['AssociatedVPCs'] + dnssec = report['DNSSEC'] + + print(f"\ndescribe_zone.py v{VERSION}") + print("=" * 60) + print("HOSTED ZONE SUMMARY") + print("=" * 60) + + print(f"\n{'Zone ID':<22} {hz['Id']}") + print(f"{'Zone Name':<22} {hz['Name']}") + print(f"{'Account ID':<22} {report['AccountId']}") + print(f"{'Comment':<22} {hz['Comment'] or '(none)'}") + print(f"{'Private Zone':<22} {hz['PrivateZone']}") + print(f"{'Caller Reference':<22} {hz['CallerReference']}") + + print(f"\n--- Record Count {'---':-<42}") + print(f" {'Exact (paginated)':<22} {rc['Exact']}") + print(f" {'AWS Estimate':<22} {rc['Estimate']}") + + print(f"\n--- Delegation Set {'---':-<41}") + if ds['NameServers']: + print(f" {'Set ID':<22} {ds['Id'] or '(default)'}") + for i, ns in enumerate(ds['NameServers']): + label = 'Name Servers' if i == 0 else '' + print(f" {label:<22} {ns}") + else: + print(" (Private zone — no delegation set)") + + print(f"\n--- Associated VPCs {'---':-<40}") + if vpcs: + print(f" {'VPC ID':<24} {'Region'}") + print(f" {'-'*24} {'-'*15}") + for vpc in vpcs: + print(f" {vpc['VPCId']:<24} {vpc['VPCRegion']}") + else: + print(" (Public zone — no VPC associations)") + + print(f"\n--- Tags {'---':-<51}") + if report['Tags']: + for k, v in sorted(report['Tags'].items()): + print(f" {k:<30} {v}") + else: + print(" (no tags)") + + print(f"\n--- DNSSEC {'---':-<49}") + print(f" {'Status':<22} {dnssec['Status']}") + if dnssec['KeySigningKeys']: + for ksk in dnssec['KeySigningKeys']: + print(f" {'KSK':<22} {ksk['Name']} | {ksk['Status']} | {ksk['Algorithm']} | created {ksk['CreatedOn']}") + else: + print(" (no key signing keys)") + + print() + + +def export_json(report: dict, output_file: str): + with open(output_file, 'w') as f: + json.dump(report, f, indent=2, default=str) + print(f"Report exported to {output_file}") + + +def main(): + parser = argparse.ArgumentParser( + description=f'Describe a Route 53 hosted zone in detail. v{VERSION}' + ) + parser.add_argument('--zone-id', required=True, help='Hosted Zone ID (e.g., Z1234567890ABC)') + parser.add_argument('--profile', help='AWS profile name') + parser.add_argument('--region', default='us-gov-east-1', help='AWS region (default: us-gov-east-1)') + parser.add_argument('--output-json', metavar='FILE', help='Export report to JSON file') + parser.add_argument('--quiet', action='store_true', help='Suppress console output (use with --output-json)') + args = parser.parse_args() + + zone_id = args.zone_id.split('/')[-1] + + client = get_client(args.profile, args.region) + account_id = get_account_id(args.profile) + details = get_zone_details(client, zone_id) + record_count = get_record_count(client, zone_id) + tags = get_tags(client, zone_id) + dnssec = get_dnssec_status(client, zone_id) + + report = build_report(zone_id, details, record_count, tags, dnssec, account_id) + + if not args.quiet: + print_report(report) + + if args.output_json: + export_json(report, args.output_json) + + +if __name__ == '__main__': + main() diff --git a/local-app/python-tools/route53-migration/disassociate_vpcs.py b/local-app/python-tools/route53-migration/disassociate_vpcs.py new file mode 100755 index 00000000..1e1197de --- /dev/null +++ b/local-app/python-tools/route53-migration/disassociate_vpcs.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python3 +""" +disassociate_vpcs.py v1.0.0 + +Remove VPC associations from a Route 53 private hosted zone. +Supports interactive per-VPC confirmation and dry-run mode. + +Usage: + # Interactive mode (confirm each VPC) + python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account + + # Non-interactive — disassociate all VPCs without prompting + python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account --yes + + # Dry run — show what would happen without making changes + python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account --dry-run + + # Target specific VPCs only + python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account --vpc-ids vpc-aaa111 vpc-bbb222 +""" + +import argparse +import boto3 +import json +import sys +from typing import Optional + +VERSION = "1.0.0" + + +def get_client(profile: Optional[str], region: str, service: str = 'route53'): + session = boto3.Session(profile_name=profile, region_name=region) + return session.client(service) + + +def get_zone_details(client, zone_id: str) -> dict: + try: + response = client.get_hosted_zone(Id=zone_id) + except client.exceptions.NoSuchHostedZone: + print(f"ERROR: Hosted zone '{zone_id}' not found.", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"ERROR fetching zone details: {e}", file=sys.stderr) + sys.exit(1) + + hz = response['HostedZone'] + config = hz.get('Config', {}) + + if not config.get('PrivateZone', False): + print("ERROR: This zone is not a private hosted zone. VPC associations only apply to private zones.", file=sys.stderr) + sys.exit(1) + + return { + 'Name': hz.get('Name', ''), + 'Comment': config.get('Comment', ''), + 'VPCs': response.get('VPCs', []), + } + + +def get_account_id(profile: Optional[str]) -> str: + try: + session = boto3.Session(profile_name=profile) + return session.client('sts').get_caller_identity()['Account'] + except Exception as e: + return f"UNKNOWN ({e})" + + +def print_header(zone_id: str, zone_name: str, account_id: str, vpcs: list): + print(f"\ndisassociate_vpcs.py v{VERSION}") + print("=" * 60) + print("VPC DISASSOCIATION") + print("=" * 60) + print(f"\n{'Zone ID':<22} {zone_id}") + print(f"{'Zone Name':<22} {zone_name}") + print(f"{'Account ID':<22} {account_id}") + print(f"\nAssociated VPCs ({len(vpcs)} total):") + print(f" {'VPC ID':<24} {'Region'}") + print(f" {'-'*24} {'-'*15}") + for vpc in vpcs: + print(f" {vpc['VPCId']:<24} {vpc['VPCRegion']}") + print() + + +def confirm_vpc(vpc: dict) -> bool: + """Prompt the user to confirm disassociation of a single VPC. Returns True to proceed.""" + while True: + answer = input(f" Disassociate {vpc['VPCId']} ({vpc['VPCRegion']})? [y/n/q]: ").strip().lower() + if answer == 'y': + return True + elif answer == 'n': + return False + elif answer == 'q': + print("\nAborted by user.") + sys.exit(0) + else: + print(" Please enter y (yes), n (no), or q (quit).") + + +def disassociate_vpc(client, zone_id: str, vpc: dict, dry_run: bool) -> bool: + """ + Disassociate a single VPC from the hosted zone. + Returns True on success (or dry-run), False on failure. + """ + vpc_id = vpc['VPCId'] + vpc_region = vpc['VPCRegion'] + + if dry_run: + print(f" [DRY RUN] Would disassociate {vpc_id} ({vpc_region})") + return True + + try: + client.disassociate_vpc_from_hosted_zone( + HostedZoneId=zone_id, + VPC={ + 'VPCRegion': vpc_region, + 'VPCId': vpc_id, + }, + Comment=f'Disassociated by disassociate_vpcs.py v{VERSION}' + ) + print(f" OK Disassociated {vpc_id} ({vpc_region})") + return True + except client.exceptions.LastVPCAssociation: + print(f" SKIP {vpc_id} — this is the last VPC association and cannot be removed " + f"without deleting the zone.", file=sys.stderr) + return False + except client.exceptions.VPCAssociationNotFound: + print(f" SKIP {vpc_id} — VPC association not found (already removed?).", file=sys.stderr) + return False + except Exception as e: + print(f" ERROR disassociating {vpc_id}: {e}", file=sys.stderr) + return False + + +def run(zone_id: str, vpcs: list, dry_run: bool, interactive: bool, client): + results = {'succeeded': [], 'skipped': [], 'failed': []} + + if dry_run: + print("DRY RUN — no changes will be made.\n") + + for vpc in vpcs: + vpc_id = vpc['VPCId'] + + if interactive and not dry_run: + proceed = confirm_vpc(vpc) + if not proceed: + print(f" Skipped {vpc_id}") + results['skipped'].append(vpc_id) + continue + + success = disassociate_vpc(client, zone_id, vpc, dry_run) + if success: + results['succeeded'].append(vpc_id) + else: + results['failed'].append(vpc_id) + + return results + + +def print_summary(results: dict, dry_run: bool): + print("\n--- Summary " + "-" * 47) + if dry_run: + print(f" Would disassociate : {len(results['succeeded'])}") + else: + print(f" Succeeded : {len(results['succeeded'])}") + print(f" Skipped : {len(results['skipped'])}") + print(f" Failed : {len(results['failed'])}") + if results['failed']: + print(f"\n Failed VPCs:") + for vpc_id in results['failed']: + print(f" {vpc_id}") + print() + + +def main(): + parser = argparse.ArgumentParser( + description=f'Remove VPC associations from a Route 53 private hosted zone. v{VERSION}' + ) + parser.add_argument('--zone-id', required=True, help='Hosted Zone ID (e.g., Z1234567890ABC)') + parser.add_argument('--profile', help='AWS profile name') + parser.add_argument('--region', default='us-east-1', help='AWS region (default: us-east-1)') + parser.add_argument('--vpc-ids', nargs='+', metavar='VPC_ID', + help='Only disassociate these specific VPC IDs (default: all associated VPCs)') + parser.add_argument('--dry-run', action='store_true', + help='Show what would be disassociated without making changes') + parser.add_argument('--yes', action='store_true', + help='Non-interactive: disassociate all targeted VPCs without prompting') + parser.add_argument('--output-json', metavar='FILE', + help='Write results summary to a JSON file') + args = parser.parse_args() + + # --yes and --dry-run together is fine (dry-run takes precedence, no prompts needed) + interactive = not args.yes and not args.dry_run + + zone_id = args.zone_id.split('/')[-1] + client = get_client(args.profile, args.region) + account_id = get_account_id(args.profile) + details = get_zone_details(client, zone_id) + + vpcs = details['VPCs'] + if not vpcs: + print(f"Zone {zone_id} has no VPC associations. Nothing to do.") + sys.exit(0) + + # Filter to specific VPCs if requested + if args.vpc_ids: + requested = set(args.vpc_ids) + vpcs = [v for v in vpcs if v['VPCId'] in requested] + not_found = requested - {v['VPCId'] for v in vpcs} + if not_found: + print(f"WARNING: The following VPC IDs were not found associated to this zone: " + f"{', '.join(not_found)}", file=sys.stderr) + if not vpcs: + print("No matching VPCs found. Nothing to do.") + sys.exit(0) + + print_header(zone_id, details['Name'], account_id, vpcs) + + # Warn if only one association exists + if len(details['VPCs']) == 1 and not args.dry_run: + print("WARNING: This zone has only one VPC association. AWS will not allow you to") + print(" remove it without deleting the zone entirely.\n") + + results = run(zone_id, vpcs, dry_run=args.dry_run, interactive=interactive, client=client) + print_summary(results, dry_run=args.dry_run) + + if args.output_json: + output = { + 'ToolVersion': VERSION, + 'ZoneId': zone_id, + 'ZoneName': details['Name'], + 'AccountId': account_id, + 'DryRun': args.dry_run, + 'Results': results, + } + with open(args.output_json, 'w') as f: + json.dump(output, f, indent=2) + print(f"Results written to {args.output_json}") + + +if __name__ == '__main__': + main() diff --git a/local-app/python-tools/route53-migration/example.txt b/local-app/python-tools/route53-migration/example.txt new file mode 100644 index 00000000..168f85b5 --- /dev/null +++ b/local-app/python-tools/route53-migration/example.txt @@ -0,0 +1,99 @@ +## List + +# Basic table output +python list_records.py --zone-id Z1234567890ABC + +# Use a specific AWS profile (source account) +python list_records.py --zone-id Z1234567890ABC --profile source-account + +# Export to JSON for use in migration script (step 2) +python list_records.py --zone-id Z1234567890ABC --profile source-account --output-json zone_records.json --summary + +# Export to CSV for review +python list_records.py --zone-id Z1234567890ABC --profile source-account --output-csv zone_records.csv --quiet + +# Accepts full ARN-style zone ID too +python list_records.py --zone-id /hostedzone/Z1234567890ABC + + +## Import + +# Step 1 — Export from source account +python list_records.py \ + --zone-id Z1234567890ABC \ + --profile source-account \ + --output-json zone_records.json \ + --summary + +# Step 2a — Dry run into a NEW zone in dest account +python import_records.py \ + --input-json zone_records.json \ + --zone-name example.com \ + --profile dest-account \ + --dry-run + +# Step 2b — Actually create zone and import +python import_records.py \ + --input-json zone_records.json \ + --zone-name example.com \ + --profile dest-account + +# Step 2c — Or import into an EXISTING zone +python import_records.py \ + --input-json zone_records.json \ + --zone-id Z0987654321XYZ \ + --profile dest-account \ + --action UPSERT # safe for re-runs + + +## Describe + +# Console output only +python describe_zone.py --zone-id Z1234567890ABC --profile source-account + +# Export to JSON (feeds nicely into an audit trail or migration ticket) +python describe_zone.py --zone-id Z1234567890ABC --profile source-account --output-json zone_info.json + +# Full migration prep — run all three in sequence +python describe_zone.py --zone-id Z1234567890ABC --profile source-account --output-json zone_info.json +python list_records.py --zone-id Z1234567890ABC --profile source-account --output-json zone_records.json --summary +python import_records.py --input-json zone_records.json --zone-name example.com --profile dest-account --dry-run + +## Disassociate + +# See what would be removed — no changes made +python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account --dry-run + +# Interactive — confirm each VPC one at a time (default mode) +python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account + +# Non-interactive — remove all associations without prompting +python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account --yes + +# Target specific VPCs only +python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account \ + --vpc-ids vpc-aaa111 vpc-bbb222 + +# Dry run with JSON output for change record / audit trail +python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account \ + --dry-run --output-json disassoc_plan.json + + +## Delete + +# Always start with a dry run +python delete_zone.py --zone-id Z1234567890ABC --profile source-account --dry-run + +# Empty zone only — two confirmation prompts, backups written first +python delete_zone.py --zone-id Z1234567890ABC --profile source-account + +# Empty and delete — backups written, then two separate confirmations +python delete_zone.py --zone-id Z1234567890ABC --profile source-account --delete-zone + +# Non-interactive pipeline use +python delete_zone.py --zone-id Z1234567890ABC --profile source-account \ + --delete-zone --yes --output-dir ./backups/ + +# Dry run with backups going to a specific directory +python delete_zone.py --zone-id Z1234567890ABC --profile source-account \ + --dry-run --output-dir ./backups/ diff --git a/local-app/python-tools/route53-migration/import_records.py b/local-app/python-tools/route53-migration/import_records.py new file mode 100755 index 00000000..8af387b4 --- /dev/null +++ b/local-app/python-tools/route53-migration/import_records.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +""" +import_records.py v1.0.0 + +Import resource records into a Route 53 hosted zone in a destination account. +Optionally creates the hosted zone if --zone-id is not provided. + +Usage: + # Create zone and import: + python import_records.py --input-json zone_records.json --zone-name example.com --profile dest-account + + # Import into existing zone: + python import_records.py --input-json zone_records.json --zone-id Z0987654321XYZ --profile dest-account +""" + +import argparse +import boto3 +import json +import sys +import time +import uuid +from typing import Optional + +VERSION = "1.0.0" + +# Record types that cannot be imported (AWS-managed or unsupported via API) +SKIP_TYPES = {'SOA', 'NS'} + +# Max records per change batch (Route 53 limit is 1000, but smaller batches are safer) +BATCH_SIZE = 100 + + +def get_client(profile: Optional[str], region: str): + session = boto3.Session(profile_name=profile, region_name=region) + return session.client('route53') + + +def create_hosted_zone(client, zone_name: str, private: bool, vpc_id: Optional[str], vpc_region: Optional[str], comment: str) -> str: + """Create a new hosted zone and return its ID.""" + if not zone_name.endswith('.'): + zone_name += '.' + + kwargs = { + 'Name': zone_name, + 'CallerReference': str(uuid.uuid4()), + 'HostedZoneConfig': { + 'Comment': comment, + 'PrivateZone': private + } + } + + if private: + if not vpc_id or not vpc_region: + print("ERROR: --vpc-id and --vpc-region are required for private hosted zones.", file=sys.stderr) + sys.exit(1) + kwargs['VPC'] = {'VPCRegion': vpc_region, 'VPCId': vpc_id} + + response = client.create_hosted_zone(**kwargs) + zone_id = response['HostedZone']['Id'].split('/')[-1] + print(f"Created hosted zone '{zone_name}' with ID: {zone_id}") + return zone_id + + +def load_records(input_file: str) -> tuple[str, list]: + """Load records from JSON export produced by list_records.py.""" + with open(input_file) as f: + data = json.load(f) + + records = data.get('ResourceRecordSets', []) + source_zone_id = data.get('HostedZoneId', 'unknown') + print(f"Loaded {len(records)} records from {input_file} (source zone: {source_zone_id})") + return source_zone_id, records + + +def filter_records(records: list, skip_types: set, skip_apex_ns: bool) -> tuple[list, list]: + """ + Split records into importable and skipped lists. + SOA and NS records at the zone apex are always skipped (AWS auto-creates them). + """ + importable = [] + skipped = [] + + for record in records: + rtype = record['Type'] + if rtype in skip_types: + skipped.append(record) + continue + importable.append(record) + + return importable, skipped + + +def build_change_batch(records: list, action: str = 'CREATE') -> dict: + """Build a Route 53 change batch from a list of resource record sets.""" + changes = [] + for record in records: + changes.append({ + 'Action': action, + 'ResourceRecordSet': record + }) + return {'Changes': changes} + + +def apply_change_batch(client, zone_id: str, batch: dict, dry_run: bool) -> Optional[str]: + """Submit a change batch to Route 53 and return the change ID.""" + if dry_run: + print(f" [DRY RUN] Would apply {len(batch['Changes'])} changes.") + return None + + response = client.change_resource_record_sets( + HostedZoneId=zone_id, + ChangeBatch=batch + ) + change_id = response['ChangeInfo']['Id'].split('/')[-1] + return change_id + + +def wait_for_change(client, change_id: str, timeout: int = 120): + """Poll until a Route 53 change batch reaches INSYNC status.""" + print(f" Waiting for change {change_id} to sync", end='', flush=True) + deadline = time.time() + timeout + while time.time() < deadline: + response = client.get_change(Id=change_id) + status = response['ChangeInfo']['Status'] + if status == 'INSYNC': + print(" done.") + return + print('.', end='', flush=True) + time.sleep(5) + print(f"\nWARNING: Change {change_id} did not reach INSYNC within {timeout}s.") + + +def import_records(client, zone_id: str, records: list, dry_run: bool, wait: bool, action: str = 'CREATE'): + """Chunk records into batches and apply them.""" + total = len(records) + print(f"\nImporting {total} records in batches of {BATCH_SIZE} (action={action}, dry_run={dry_run})...") + + for i in range(0, total, BATCH_SIZE): + chunk = records[i:i + BATCH_SIZE] + batch_num = (i // BATCH_SIZE) + 1 + print(f"\n Batch {batch_num}: {len(chunk)} records") + for r in chunk: + print(f" {r['Type']:<8} {r['Name']}") + + batch = build_change_batch(chunk, action=action) + change_id = apply_change_batch(client, zone_id, batch, dry_run) + + if change_id and wait: + wait_for_change(client, change_id) + elif change_id: + print(f" Change submitted: {change_id} (not waiting for INSYNC)") + + print(f"\nDone. {total} records processed.") + + +def print_skipped(skipped: list): + if not skipped: + return + print(f"\nSkipped {len(skipped)} records (SOA/NS — recreated automatically by AWS):") + for r in skipped: + print(f" {r['Type']:<8} {r['Name']}") + + +def main(): + parser = argparse.ArgumentParser( + description=f'Import Route 53 records into a destination account. v{VERSION}' + ) + + # Source + parser.add_argument('--input-json', required=True, + help='JSON file produced by list_records.py') + + # Destination zone — one of these is required + dest_group = parser.add_mutually_exclusive_group(required=True) + dest_group.add_argument('--zone-id', + help='Existing destination hosted zone ID') + dest_group.add_argument('--zone-name', + help='Create a new hosted zone with this name') + + # Zone creation options (only used with --zone-name) + parser.add_argument('--private', action='store_true', + help='Create a private hosted zone (requires --vpc-id and --vpc-region)') + parser.add_argument('--vpc-id', + help='VPC ID to associate with a private hosted zone') + parser.add_argument('--vpc-region', + help='VPC region for private hosted zone association') + parser.add_argument('--zone-comment', default='Migrated by import_records.py', + help='Comment for newly created hosted zone') + + # Behavior + parser.add_argument('--action', choices=['CREATE', 'UPSERT'], default='CREATE', + help='Change action: CREATE (default) or UPSERT (safe for re-runs)') + parser.add_argument('--include-ns', action='store_true', + help='Include apex NS records (not recommended — overwrites AWS-assigned NS)') + parser.add_argument('--dry-run', action='store_true', + help='Print what would be imported without making changes') + parser.add_argument('--no-wait', action='store_true', + help='Do not wait for INSYNC after each batch') + + # AWS auth + parser.add_argument('--profile', help='AWS profile for destination account') + parser.add_argument('--region', default='us-gov-east-1', help='AWS region (default: us-gov-east-1)') + + args = parser.parse_args() + + print(f"import_records.py v{VERSION}") + + client = get_client(args.profile, args.region) + + # Resolve destination zone ID + if args.zone_id: + zone_id = args.zone_id.split('/')[-1] + print(f"Using existing zone: {zone_id}") + else: + zone_id = create_hosted_zone( + client, + zone_name=args.zone_name, + private=args.private, + vpc_id=args.vpc_id, + vpc_region=args.vpc_region, + comment=args.zone_comment + ) + + # Load and filter records + source_zone_id, records = load_records(args.input_json) + + skip_types = set(SKIP_TYPES) + if args.include_ns: + skip_types.discard('NS') + + importable, skipped = filter_records(records, skip_types, skip_apex_ns=not args.include_ns) + print_skipped(skipped) + + if not importable: + print("No importable records found. Exiting.") + sys.exit(0) + + # Run the import + import_records( + client, + zone_id=zone_id, + records=importable, + dry_run=args.dry_run, + wait=not args.no_wait, + action=args.action + ) + + +if __name__ == '__main__': + main() diff --git a/local-app/python-tools/route53-migration/list_records.py b/local-app/python-tools/route53-migration/list_records.py new file mode 100755 index 00000000..e9089d73 --- /dev/null +++ b/local-app/python-tools/route53-migration/list_records.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 +""" +list_records.py v1.0.1 + +List all resource records in a Route 53 hosted zone. +Changelog: + 1.0.1 - Added VERSION constant for consistency with migration toolset + 1.0.0 - Initial release + +Usage: python list_records.py --zone-id Z1234567890ABC [--profile myprofile] [--region us-gov-east-1] +""" + +import argparse +import boto3 +import json +import csv +import sys +from typing import Optional + +VERSION = "1.0.1" + + +def get_all_records(client, zone_id: str) -> list: + """Fetch all resource record sets from a hosted zone, handling pagination.""" + records = [] + paginator = client.get_paginator('list_resource_record_sets') + + try: + for page in paginator.paginate(HostedZoneId=zone_id): + records.extend(page['ResourceRecordSets']) + except client.exceptions.NoSuchHostedZone: + print(f"ERROR: Hosted zone '{zone_id}' not found.", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"ERROR: {e}", file=sys.stderr) + sys.exit(1) + + return records + + +def format_record_value(record: dict) -> str: + """Extract a human-readable value from a resource record set.""" + if 'AliasTarget' in record: + alias = record['AliasTarget'] + return f"ALIAS -> {alias['DNSName']} (zone: {alias['HostedZoneId']}, evaluate-health: {alias['EvaluateTargetHealth']})" + + values = [rr['Value'] for rr in record.get('ResourceRecords', [])] + return ' | '.join(values) + + +def print_table(records: list, zone_id: str): + """Print records in a formatted table.""" + print(f"\nlist_records.py v{VERSION}") + print(f"Hosted Zone: {zone_id}") + print(f"Total Records: {len(records)}\n") + + col_widths = {'name': 50, 'type': 8, 'ttl': 8, 'value': 80} + header = ( + f"{'Name':<{col_widths['name']}} " + f"{'Type':<{col_widths['type']}} " + f"{'TTL':<{col_widths['ttl']}} " + f"Value" + ) + print(header) + print('-' * (sum(col_widths.values()) + 3)) + + for record in sorted(records, key=lambda r: (r['Name'], r['Type'])): + name = record['Name'] + rtype = record['Type'] + ttl = str(record.get('TTL', 'N/A')) + value = format_record_value(record) + + values = value.split(' | ') + print(f"{name:<{col_widths['name']}} {rtype:<{col_widths['type']}} {ttl:<{col_widths['ttl']}} {values[0]}") + for v in values[1:]: + print(f"{'':<{col_widths['name']}} {'':<{col_widths['type']}} {'':<{col_widths['ttl']}} {v}") + + +def export_json(records: list, output_file: str, zone_id: str): + """Export records to JSON.""" + data = { + 'HostedZoneId': zone_id, + 'RecordCount': len(records), + 'ResourceRecordSets': records + } + with open(output_file, 'w') as f: + json.dump(data, f, indent=2, default=str) + print(f"Exported {len(records)} records to {output_file}") + + +def export_csv(records: list, output_file: str): + """Export records to CSV.""" + with open(output_file, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(['Name', 'Type', 'TTL', 'Value', 'IsAlias', 'AliasTarget', 'AliasZoneId']) + + for record in sorted(records, key=lambda r: (r['Name'], r['Type'])): + is_alias = 'AliasTarget' in record + alias_dns = record.get('AliasTarget', {}).get('DNSName', '') + alias_zone = record.get('AliasTarget', {}).get('HostedZoneId', '') + ttl = record.get('TTL', '') + + if is_alias: + writer.writerow([record['Name'], record['Type'], ttl, '', True, alias_dns, alias_zone]) + else: + values = [rr['Value'] for rr in record.get('ResourceRecords', [])] + for value in values: + writer.writerow([record['Name'], record['Type'], ttl, value, False, '', '']) + + print(f"Exported records to {output_file}") + + +def print_summary(records: list): + """Print a summary of record types.""" + type_counts = {} + for record in records: + rtype = record['Type'] + type_counts[rtype] = type_counts.get(rtype, 0) + 1 + + print("\nRecord Type Summary:") + print('-' * 20) + for rtype, count in sorted(type_counts.items()): + print(f" {rtype:<10} {count}") + print(f" {'TOTAL':<10} {len(records)}") + + +def main(): + parser = argparse.ArgumentParser(description=f'List all resource records in a Route 53 hosted zone. v{VERSION}') + parser.add_argument('--zone-id', required=True, help='Hosted Zone ID (e.g., Z1234567890ABC)') + parser.add_argument('--profile', help='AWS profile name') + parser.add_argument('--region', default='us-gov-east-1', help='AWS region (default: us-gov-east-1)') + parser.add_argument('--output-json', metavar='FILE', help='Export records to JSON file') + parser.add_argument('--output-csv', metavar='FILE', help='Export records to CSV file') + parser.add_argument('--summary', action='store_true', help='Print record type summary') + parser.add_argument('--quiet', action='store_true', help='Suppress table output (useful with --output-*)') + args = parser.parse_args() + + session = boto3.Session(profile_name=args.profile, region_name=args.region) + client = session.client('route53') + + zone_id = args.zone_id.split('/')[-1] + records = get_all_records(client, zone_id) + + if not args.quiet: + print_table(records, zone_id) + + if args.summary or args.quiet: + print_summary(records) + + if args.output_json: + export_json(records, args.output_json, zone_id) + + if args.output_csv: + export_csv(records, args.output_csv) + + +if __name__ == '__main__': + main()