From c2b671b52390578b40134afa0a9ada4707c1efc2 Mon Sep 17 00:00:00 2001 From: badra001 Date: Fri, 27 Feb 2026 15:51:12 -0500 Subject: [PATCH] rename --- local-app/python-tools/route53/delete_zone.py | 395 ------------------ .../python-tools/route53/describe_zone.py | 235 ----------- .../python-tools/route53/disassociate_vpcs.py | 241 ----------- local-app/python-tools/route53/example.txt | 99 ----- .../python-tools/route53/import_records.py | 250 ----------- .../python-tools/route53/list_records.py | 158 ------- 6 files changed, 1378 deletions(-) delete mode 100755 local-app/python-tools/route53/delete_zone.py delete mode 100755 local-app/python-tools/route53/describe_zone.py delete mode 100755 local-app/python-tools/route53/disassociate_vpcs.py delete mode 100644 local-app/python-tools/route53/example.txt delete mode 100755 local-app/python-tools/route53/import_records.py delete mode 100755 local-app/python-tools/route53/list_records.py diff --git a/local-app/python-tools/route53/delete_zone.py b/local-app/python-tools/route53/delete_zone.py deleted file mode 100755 index 1f072ce2..00000000 --- a/local-app/python-tools/route53/delete_zone.py +++ /dev/null @@ -1,395 +0,0 @@ -#!/usr/bin/env python3 -""" -delete_zone.py v1.0.0 - -Remove all deletable resource records from a Route 53 hosted zone, -then optionally delete the zone itself. - -SOA and apex NS records are skipped — AWS requires them to be present -and will reject attempts to delete them. - -Safety backups of all records and zone metadata are always written -before any destructive action is taken. - -Usage: - # Dry run — show what would be deleted, write backups - python delete_zone.py --zone-id Z1234567890ABC --profile source-account --dry-run - - # Empty the zone (delete records, keep zone) - python delete_zone.py --zone-id Z1234567890ABC --profile source-account - - # Empty and delete the zone - python delete_zone.py --zone-id Z1234567890ABC --profile source-account --delete-zone - - # Non-interactive (no confirmation prompts) - python delete_zone.py --zone-id Z1234567890ABC --profile source-account --delete-zone --yes -""" - -import argparse -import boto3 -import json -import os -import sys -import time -from datetime import datetime, timezone -from typing import Optional - -VERSION = "1.0.0" - -SKIP_TYPES = {'SOA', 'NS'} -BATCH_SIZE = 100 - - -def get_client(profile: Optional[str], region: str, service: str = 'route53'): - session = boto3.Session(profile_name=profile, region_name=region) - return session.client(service) - - -def get_account_id(profile: Optional[str]) -> str: - try: - session = boto3.Session(profile_name=profile) - return session.client('sts').get_caller_identity()['Account'] - except Exception as e: - return f"UNKNOWN ({e})" - - -def get_zone_details(client, zone_id: str) -> dict: - try: - response = client.get_hosted_zone(Id=zone_id) - except client.exceptions.NoSuchHostedZone: - print(f"ERROR: Hosted zone '{zone_id}' not found.", file=sys.stderr) - sys.exit(1) - except Exception as e: - print(f"ERROR fetching zone details: {e}", file=sys.stderr) - sys.exit(1) - - return { - 'HostedZone': response['HostedZone'], - 'DelegationSet': response.get('DelegationSet', {}), - 'VPCs': response.get('VPCs', []), - } - - -def get_all_records(client, zone_id: str) -> list: - records = [] - paginator = client.get_paginator('list_resource_record_sets') - for page in paginator.paginate(HostedZoneId=zone_id): - records.extend(page['ResourceRecordSets']) - return records - - -def get_tags(client, zone_id: str) -> dict: - try: - response = client.list_tags_for_resource( - ResourceType='hostedzone', - ResourceId=zone_id - ) - return {tag['Key']: tag['Value'] for tag in response['ResourceTagSet']['Tags']} - except Exception as e: - print(f"WARNING: Could not fetch tags: {e}", file=sys.stderr) - return {} - - -# --------------------------------------------------------------------------- -# Safety backups -# --------------------------------------------------------------------------- - -def backup_filename(zone_id: str, suffix: str, output_dir: str) -> str: - ts = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ') - return os.path.join(output_dir, f"{zone_id}_{suffix}_{ts}.json") - - -def write_records_backup(records: list, zone_id: str, output_dir: str) -> str: - path = backup_filename(zone_id, 'records', output_dir) - data = { - 'SchemaVersion': '1.0', - 'ToolVersion': VERSION, - 'HostedZoneId': zone_id, - 'RecordCount': len(records), - 'ExportedAt': datetime.now(timezone.utc).isoformat(), - 'ResourceRecordSets': records, - } - with open(path, 'w') as f: - json.dump(data, f, indent=2, default=str) - print(f" Records backup : {path}") - return path - - -def write_zone_backup(details: dict, tags: dict, account_id: str, - zone_id: str, output_dir: str) -> str: - path = backup_filename(zone_id, 'zone', output_dir) - hz = details['HostedZone'] - config = hz.get('Config', {}) - delegation = details.get('DelegationSet', {}) - - data = { - 'SchemaVersion': '1.0', - 'ToolVersion': VERSION, - 'AccountId': account_id, - 'ExportedAt': datetime.now(timezone.utc).isoformat(), - 'HostedZone': { - 'Id': zone_id, - 'Name': hz.get('Name', ''), - 'Comment': config.get('Comment', ''), - 'PrivateZone': config.get('PrivateZone', False), - 'CallerReference': hz.get('CallerReference', ''), - 'ResourceRecordSetCount': hz.get('ResourceRecordSetCount', 'N/A'), - }, - 'DelegationSet': { - 'Id': delegation.get('Id', ''), - 'CallerReference': delegation.get('CallerReference', ''), - 'NameServers': delegation.get('NameServers', []), - }, - 'AssociatedVPCs': [ - {'VPCId': v.get('VPCId', ''), 'VPCRegion': v.get('VPCRegion', '')} - for v in details.get('VPCs', []) - ], - 'Tags': tags, - } - with open(path, 'w') as f: - json.dump(data, f, indent=2, default=str) - print(f" Zone backup : {path}") - return path - - -# --------------------------------------------------------------------------- -# Record deletion -# --------------------------------------------------------------------------- - -def partition_records(records: list, zone_name: str) -> tuple[list, list]: - """ - Split records into deletable and skipped (SOA + apex NS). - zone_name should be the fully-qualified zone apex (e.g. 'example.com.'). - """ - deletable = [] - skipped = [] - - for record in records: - rtype = record['Type'] - rname = record['Name'] - - # Always skip SOA - if rtype == 'SOA': - skipped.append(record) - continue - - # Skip apex NS (AWS-managed); non-apex NS delegations are deletable - if rtype == 'NS' and rname == zone_name: - skipped.append(record) - continue - - deletable.append(record) - - return deletable, skipped - - -def build_delete_batch(records: list) -> dict: - return { - 'Changes': [ - {'Action': 'DELETE', 'ResourceRecordSet': r} - for r in records - ] - } - - -def wait_for_change(client, change_id: str, timeout: int = 120): - print(f" Waiting for change {change_id}", end='', flush=True) - deadline = time.time() + timeout - while time.time() < deadline: - status = client.get_change(Id=change_id)['ChangeInfo']['Status'] - if status == 'INSYNC': - print(" done.") - return - print('.', end='', flush=True) - time.sleep(5) - print(f"\n WARNING: Change {change_id} did not reach INSYNC within {timeout}s.") - - -def delete_records(client, zone_id: str, records: list, dry_run: bool) -> dict: - results = {'succeeded': [], 'failed': []} - total = len(records) - - if total == 0: - print("\n No deletable records found.") - return results - - print(f"\n Deleting {total} records in batches of {BATCH_SIZE}...") - - for i in range(0, total, BATCH_SIZE): - chunk = records[i:i + BATCH_SIZE] - batch_num = (i // BATCH_SIZE) + 1 - print(f"\n Batch {batch_num} ({len(chunk)} records):") - for r in chunk: - print(f" DELETE {r['Type']:<8} {r['Name']}") - - if dry_run: - print(f" [DRY RUN] Skipping actual deletion.") - results['succeeded'].extend([r['Name'] for r in chunk]) - continue - - try: - batch = build_delete_batch(chunk) - response = client.change_resource_record_sets( - HostedZoneId=zone_id, - ChangeBatch=batch - ) - change_id = response['ChangeInfo']['Id'].split('/')[-1] - wait_for_change(client, change_id) - results['succeeded'].extend([r['Name'] for r in chunk]) - except Exception as e: - print(f" ERROR in batch {batch_num}: {e}", file=sys.stderr) - results['failed'].extend([r['Name'] for r in chunk]) - - return results - - -# --------------------------------------------------------------------------- -# Zone deletion -# --------------------------------------------------------------------------- - -def delete_hosted_zone(client, zone_id: str, dry_run: bool) -> bool: - if dry_run: - print(f"\n [DRY RUN] Would delete hosted zone {zone_id}.") - return True - try: - client.delete_hosted_zone(Id=zone_id) - print(f"\n Hosted zone {zone_id} deleted successfully.") - return True - except client.exceptions.HostedZoneNotEmpty as e: - print(f"\n ERROR: Zone is not empty — cannot delete: {e}", file=sys.stderr) - return False - except Exception as e: - print(f"\n ERROR deleting zone: {e}", file=sys.stderr) - return False - - -# --------------------------------------------------------------------------- -# Confirmation prompt -# --------------------------------------------------------------------------- - -def confirm(prompt: str) -> bool: - while True: - answer = input(prompt + " [yes/no]: ").strip().lower() - if answer == 'yes': - return True - elif answer == 'no': - return False - print(" Please type 'yes' or 'no'.") - - -# --------------------------------------------------------------------------- -# Main -# --------------------------------------------------------------------------- - -def main(): - parser = argparse.ArgumentParser( - description=f'Empty a Route 53 hosted zone and optionally delete it. v{VERSION}' - ) - parser.add_argument('--zone-id', required=True, help='Hosted Zone ID (e.g., Z1234567890ABC)') - parser.add_argument('--profile', help='AWS profile name') - parser.add_argument('--region', default='us-east-1', help='AWS region (default: us-east-1)') - parser.add_argument('--delete-zone', action='store_true', - help='Delete the hosted zone after emptying it') - parser.add_argument('--dry-run', action='store_true', - help='Show what would happen without making any changes') - parser.add_argument('--yes', action='store_true', - help='Skip confirmation prompts') - parser.add_argument('--output-dir', default='.', - help='Directory for backup JSON files (default: current directory)') - args = parser.parse_args() - - zone_id = args.zone_id.split('/')[-1] - client = get_client(args.profile, args.region) - account_id = get_account_id(args.profile) - details = get_zone_details(client, zone_id) - tags = get_tags(client, zone_id) - records = get_all_records(client, zone_id) - - hz = details['HostedZone'] - zone_name = hz.get('Name', '') - is_private = hz.get('Config', {}).get('PrivateZone', False) - - print(f"\ndelete_zone.py v{VERSION}") - print("=" * 60) - print("ZONE DELETION PLAN") - print("=" * 60) - print(f"\n{'Zone ID':<22} {zone_id}") - print(f"{'Zone Name':<22} {zone_name}") - print(f"{'Account ID':<22} {account_id}") - print(f"{'Private Zone':<22} {is_private}") - print(f"{'Total Records':<22} {len(records)}") - - deletable, skipped = partition_records(records, zone_name) - print(f"{'Deletable Records':<22} {len(deletable)}") - print(f"{'Skipped (SOA/NS)':<22} {len(skipped)}") - print(f"{'Delete Zone':<22} {args.delete_zone}") - print(f"{'Dry Run':<22} {args.dry_run}") - - # Always write safety backups before touching anything - print("\n--- Safety Backups " + "-" * 40) - if args.dry_run: - print(" [DRY RUN] Backups would be written here:") - print(f" Records backup : {backup_filename(zone_id, 'records', args.output_dir)}") - print(f" Zone backup : {backup_filename(zone_id, 'zone', args.output_dir)}") - else: - os.makedirs(args.output_dir, exist_ok=True) - write_records_backup(records, zone_id, args.output_dir) - write_zone_backup(details, tags, account_id, zone_id, args.output_dir) - - # Confirm record deletion - print("\n--- Record Deletion " + "-" * 40) - if deletable: - for r in sorted(deletable, key=lambda x: (x['Name'], x['Type'])): - print(f" DELETE {r['Type']:<8} {r['Name']}") - else: - print(" Nothing to delete.") - - if skipped: - print(f"\n Skipping (AWS-managed):") - for r in skipped: - print(f" KEEP {r['Type']:<8} {r['Name']}") - - if deletable: - if not args.dry_run and not args.yes: - if not confirm(f"\nDelete {len(deletable)} records from zone {zone_id}?"): - print("Aborted.") - sys.exit(0) - - delete_results = delete_records(client, zone_id, deletable, args.dry_run) - else: - delete_results = {'succeeded': [], 'failed': []} - - # Confirm zone deletion - if args.delete_zone: - print("\n--- Zone Deletion " + "-" * 42) - if delete_results['failed']: - print(" WARNING: Some record deletions failed. Skipping zone deletion to avoid data loss.") - else: - if not args.dry_run and not args.yes: - if not confirm(f"Permanently delete hosted zone {zone_id} ({zone_name})?"): - print(" Zone deletion skipped.") - else: - delete_hosted_zone(client, zone_id, args.dry_run) - else: - delete_hosted_zone(client, zone_id, args.dry_run) - - # Summary - print("\n--- Summary " + "-" * 47) - if args.dry_run: - print(f" Would delete records : {len(deletable)}") - print(f" Would keep (SOA/NS) : {len(skipped)}") - if args.delete_zone: - print(f" Would delete zone : {zone_id}") - else: - print(f" Records succeeded : {len(delete_results['succeeded'])}") - print(f" Records failed : {len(delete_results['failed'])}") - print(f" Kept (SOA/NS) : {len(skipped)}") - if delete_results['failed']: - print(f"\n Failed records:") - for name in delete_results['failed']: - print(f" {name}") - print() - - -if __name__ == '__main__': - main() diff --git a/local-app/python-tools/route53/describe_zone.py b/local-app/python-tools/route53/describe_zone.py deleted file mode 100755 index b324f13a..00000000 --- a/local-app/python-tools/route53/describe_zone.py +++ /dev/null @@ -1,235 +0,0 @@ -#!/usr/bin/env python3 -""" -describe_zone.py v1.0.0 - -Describe a Route 53 hosted zone in detail: metadata, record count, -delegation set, associated VPCs, tags, and DNSSEC status. - -Usage: - python describe_zone.py --zone-id Z1234567890ABC [--profile myprofile] [--output-json zone_info.json] -""" - -import argparse -import boto3 -import json -import sys -from typing import Optional - -VERSION = "1.0.0" - - -def get_client(profile: Optional[str], region: str): - session = boto3.Session(profile_name=profile, region_name=region) - return session.client('route53') - - -def get_zone_details(client, zone_id: str) -> dict: - """Fetch core hosted zone metadata.""" - try: - response = client.get_hosted_zone(Id=zone_id) - except client.exceptions.NoSuchHostedZone: - print(f"ERROR: Hosted zone '{zone_id}' not found.", file=sys.stderr) - sys.exit(1) - except Exception as e: - print(f"ERROR fetching zone details: {e}", file=sys.stderr) - sys.exit(1) - - return { - 'HostedZone': response['HostedZone'], - 'DelegationSet': response.get('DelegationSet', {}), - 'VPCs': response.get('VPCs', []), - } - - -def get_record_count(client, zone_id: str) -> int: - """ - Return the exact record count by paginating all RRs. - The ResourceRecordSetCount in GetHostedZone is an estimate and can lag. - """ - count = 0 - paginator = client.get_paginator('list_resource_record_sets') - for page in paginator.paginate(HostedZoneId=zone_id): - count += len(page['ResourceRecordSets']) - return count - - -def get_tags(client, zone_id: str) -> dict: - """Fetch tags for the hosted zone.""" - try: - response = client.list_tags_for_resource( - ResourceType='hostedzone', - ResourceId=zone_id - ) - return {tag['Key']: tag['Value'] for tag in response['ResourceTagSet']['Tags']} - except Exception as e: - print(f"WARNING: Could not fetch tags: {e}", file=sys.stderr) - return {} - - -def get_dnssec_status(client, zone_id: str) -> dict: - """Fetch DNSSEC signing status (public zones only).""" - try: - response = client.get_dnssec(HostedZoneId=zone_id) - return { - 'Status': response['Status'].get('ServeSignature', 'UNKNOWN'), - 'KeySigningKeys': [ - { - 'Name': ksk['Name'], - 'Status': ksk['Status'], - 'Algorithm': ksk.get('SigningAlgorithmMnemonic', ''), - 'CreatedOn': str(ksk.get('CreatedDate', '')), - } - for ksk in response.get('KeySigningKeys', []) - ] - } - except client.exceptions.NoSuchHostedZone: - return {'Status': 'NOT_APPLICABLE', 'KeySigningKeys': []} - except Exception: - # Private zones and GovCloud may not support DNSSEC - return {'Status': 'NOT_APPLICABLE', 'KeySigningKeys': []} - - -def get_account_id(profile: Optional[str]) -> str: - """Resolve the AWS account ID for the active credentials.""" - try: - session = boto3.Session(profile_name=profile) - sts = session.client('sts') - return sts.get_caller_identity()['Account'] - except Exception as e: - return f"UNKNOWN ({e})" - - -def build_report(zone_id: str, details: dict, record_count: int, tags: dict, - dnssec: dict, account_id: str) -> dict: - """Assemble all collected data into a structured report dict.""" - hz = details['HostedZone'] - config = hz.get('Config', {}) - delegation = details.get('DelegationSet', {}) - vpcs = details.get('VPCs', []) - - return { - 'SchemaVersion': '1.0', - 'ToolVersion': VERSION, - 'AccountId': account_id, - 'HostedZone': { - 'Id': zone_id, - 'Name': hz.get('Name', ''), - 'Comment': config.get('Comment', ''), - 'PrivateZone': config.get('PrivateZone', False), - 'CallerReference': hz.get('CallerReference', ''), - }, - 'RecordCount': { - 'Exact': record_count, - 'Estimate': hz.get('ResourceRecordSetCount', 'N/A'), - }, - 'DelegationSet': { - 'Id': delegation.get('Id', ''), - 'CallerReference': delegation.get('CallerReference', ''), - 'NameServers': delegation.get('NameServers', []), - }, - 'AssociatedVPCs': [ - {'VPCId': vpc.get('VPCId', ''), 'VPCRegion': vpc.get('VPCRegion', '')} - for vpc in vpcs - ], - 'Tags': tags, - 'DNSSEC': dnssec, - } - - -def print_report(report: dict): - """Print a human-readable summary of the zone report.""" - hz = report['HostedZone'] - rc = report['RecordCount'] - ds = report['DelegationSet'] - vpcs = report['AssociatedVPCs'] - dnssec = report['DNSSEC'] - - print(f"\ndescribe_zone.py v{VERSION}") - print("=" * 60) - print("HOSTED ZONE SUMMARY") - print("=" * 60) - - print(f"\n{'Zone ID':<22} {hz['Id']}") - print(f"{'Zone Name':<22} {hz['Name']}") - print(f"{'Account ID':<22} {report['AccountId']}") - print(f"{'Comment':<22} {hz['Comment'] or '(none)'}") - print(f"{'Private Zone':<22} {hz['PrivateZone']}") - print(f"{'Caller Reference':<22} {hz['CallerReference']}") - - print(f"\n--- Record Count {'---':-<42}") - print(f" {'Exact (paginated)':<22} {rc['Exact']}") - print(f" {'AWS Estimate':<22} {rc['Estimate']}") - - print(f"\n--- Delegation Set {'---':-<41}") - if ds['NameServers']: - print(f" {'Set ID':<22} {ds['Id'] or '(default)'}") - for i, ns in enumerate(ds['NameServers']): - label = 'Name Servers' if i == 0 else '' - print(f" {label:<22} {ns}") - else: - print(" (Private zone — no delegation set)") - - print(f"\n--- Associated VPCs {'---':-<40}") - if vpcs: - print(f" {'VPC ID':<24} {'Region'}") - print(f" {'-'*24} {'-'*15}") - for vpc in vpcs: - print(f" {vpc['VPCId']:<24} {vpc['VPCRegion']}") - else: - print(" (Public zone — no VPC associations)") - - print(f"\n--- Tags {'---':-<51}") - if report['Tags']: - for k, v in sorted(report['Tags'].items()): - print(f" {k:<30} {v}") - else: - print(" (no tags)") - - print(f"\n--- DNSSEC {'---':-<49}") - print(f" {'Status':<22} {dnssec['Status']}") - if dnssec['KeySigningKeys']: - for ksk in dnssec['KeySigningKeys']: - print(f" {'KSK':<22} {ksk['Name']} | {ksk['Status']} | {ksk['Algorithm']} | created {ksk['CreatedOn']}") - else: - print(" (no key signing keys)") - - print() - - -def export_json(report: dict, output_file: str): - with open(output_file, 'w') as f: - json.dump(report, f, indent=2, default=str) - print(f"Report exported to {output_file}") - - -def main(): - parser = argparse.ArgumentParser( - description=f'Describe a Route 53 hosted zone in detail. v{VERSION}' - ) - parser.add_argument('--zone-id', required=True, help='Hosted Zone ID (e.g., Z1234567890ABC)') - parser.add_argument('--profile', help='AWS profile name') - parser.add_argument('--region', default='us-gov-east-1', help='AWS region (default: us-gov-east-1)') - parser.add_argument('--output-json', metavar='FILE', help='Export report to JSON file') - parser.add_argument('--quiet', action='store_true', help='Suppress console output (use with --output-json)') - args = parser.parse_args() - - zone_id = args.zone_id.split('/')[-1] - - client = get_client(args.profile, args.region) - account_id = get_account_id(args.profile) - details = get_zone_details(client, zone_id) - record_count = get_record_count(client, zone_id) - tags = get_tags(client, zone_id) - dnssec = get_dnssec_status(client, zone_id) - - report = build_report(zone_id, details, record_count, tags, dnssec, account_id) - - if not args.quiet: - print_report(report) - - if args.output_json: - export_json(report, args.output_json) - - -if __name__ == '__main__': - main() diff --git a/local-app/python-tools/route53/disassociate_vpcs.py b/local-app/python-tools/route53/disassociate_vpcs.py deleted file mode 100755 index 1e1197de..00000000 --- a/local-app/python-tools/route53/disassociate_vpcs.py +++ /dev/null @@ -1,241 +0,0 @@ -#!/usr/bin/env python3 -""" -disassociate_vpcs.py v1.0.0 - -Remove VPC associations from a Route 53 private hosted zone. -Supports interactive per-VPC confirmation and dry-run mode. - -Usage: - # Interactive mode (confirm each VPC) - python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account - - # Non-interactive — disassociate all VPCs without prompting - python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account --yes - - # Dry run — show what would happen without making changes - python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account --dry-run - - # Target specific VPCs only - python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account --vpc-ids vpc-aaa111 vpc-bbb222 -""" - -import argparse -import boto3 -import json -import sys -from typing import Optional - -VERSION = "1.0.0" - - -def get_client(profile: Optional[str], region: str, service: str = 'route53'): - session = boto3.Session(profile_name=profile, region_name=region) - return session.client(service) - - -def get_zone_details(client, zone_id: str) -> dict: - try: - response = client.get_hosted_zone(Id=zone_id) - except client.exceptions.NoSuchHostedZone: - print(f"ERROR: Hosted zone '{zone_id}' not found.", file=sys.stderr) - sys.exit(1) - except Exception as e: - print(f"ERROR fetching zone details: {e}", file=sys.stderr) - sys.exit(1) - - hz = response['HostedZone'] - config = hz.get('Config', {}) - - if not config.get('PrivateZone', False): - print("ERROR: This zone is not a private hosted zone. VPC associations only apply to private zones.", file=sys.stderr) - sys.exit(1) - - return { - 'Name': hz.get('Name', ''), - 'Comment': config.get('Comment', ''), - 'VPCs': response.get('VPCs', []), - } - - -def get_account_id(profile: Optional[str]) -> str: - try: - session = boto3.Session(profile_name=profile) - return session.client('sts').get_caller_identity()['Account'] - except Exception as e: - return f"UNKNOWN ({e})" - - -def print_header(zone_id: str, zone_name: str, account_id: str, vpcs: list): - print(f"\ndisassociate_vpcs.py v{VERSION}") - print("=" * 60) - print("VPC DISASSOCIATION") - print("=" * 60) - print(f"\n{'Zone ID':<22} {zone_id}") - print(f"{'Zone Name':<22} {zone_name}") - print(f"{'Account ID':<22} {account_id}") - print(f"\nAssociated VPCs ({len(vpcs)} total):") - print(f" {'VPC ID':<24} {'Region'}") - print(f" {'-'*24} {'-'*15}") - for vpc in vpcs: - print(f" {vpc['VPCId']:<24} {vpc['VPCRegion']}") - print() - - -def confirm_vpc(vpc: dict) -> bool: - """Prompt the user to confirm disassociation of a single VPC. Returns True to proceed.""" - while True: - answer = input(f" Disassociate {vpc['VPCId']} ({vpc['VPCRegion']})? [y/n/q]: ").strip().lower() - if answer == 'y': - return True - elif answer == 'n': - return False - elif answer == 'q': - print("\nAborted by user.") - sys.exit(0) - else: - print(" Please enter y (yes), n (no), or q (quit).") - - -def disassociate_vpc(client, zone_id: str, vpc: dict, dry_run: bool) -> bool: - """ - Disassociate a single VPC from the hosted zone. - Returns True on success (or dry-run), False on failure. - """ - vpc_id = vpc['VPCId'] - vpc_region = vpc['VPCRegion'] - - if dry_run: - print(f" [DRY RUN] Would disassociate {vpc_id} ({vpc_region})") - return True - - try: - client.disassociate_vpc_from_hosted_zone( - HostedZoneId=zone_id, - VPC={ - 'VPCRegion': vpc_region, - 'VPCId': vpc_id, - }, - Comment=f'Disassociated by disassociate_vpcs.py v{VERSION}' - ) - print(f" OK Disassociated {vpc_id} ({vpc_region})") - return True - except client.exceptions.LastVPCAssociation: - print(f" SKIP {vpc_id} — this is the last VPC association and cannot be removed " - f"without deleting the zone.", file=sys.stderr) - return False - except client.exceptions.VPCAssociationNotFound: - print(f" SKIP {vpc_id} — VPC association not found (already removed?).", file=sys.stderr) - return False - except Exception as e: - print(f" ERROR disassociating {vpc_id}: {e}", file=sys.stderr) - return False - - -def run(zone_id: str, vpcs: list, dry_run: bool, interactive: bool, client): - results = {'succeeded': [], 'skipped': [], 'failed': []} - - if dry_run: - print("DRY RUN — no changes will be made.\n") - - for vpc in vpcs: - vpc_id = vpc['VPCId'] - - if interactive and not dry_run: - proceed = confirm_vpc(vpc) - if not proceed: - print(f" Skipped {vpc_id}") - results['skipped'].append(vpc_id) - continue - - success = disassociate_vpc(client, zone_id, vpc, dry_run) - if success: - results['succeeded'].append(vpc_id) - else: - results['failed'].append(vpc_id) - - return results - - -def print_summary(results: dict, dry_run: bool): - print("\n--- Summary " + "-" * 47) - if dry_run: - print(f" Would disassociate : {len(results['succeeded'])}") - else: - print(f" Succeeded : {len(results['succeeded'])}") - print(f" Skipped : {len(results['skipped'])}") - print(f" Failed : {len(results['failed'])}") - if results['failed']: - print(f"\n Failed VPCs:") - for vpc_id in results['failed']: - print(f" {vpc_id}") - print() - - -def main(): - parser = argparse.ArgumentParser( - description=f'Remove VPC associations from a Route 53 private hosted zone. v{VERSION}' - ) - parser.add_argument('--zone-id', required=True, help='Hosted Zone ID (e.g., Z1234567890ABC)') - parser.add_argument('--profile', help='AWS profile name') - parser.add_argument('--region', default='us-east-1', help='AWS region (default: us-east-1)') - parser.add_argument('--vpc-ids', nargs='+', metavar='VPC_ID', - help='Only disassociate these specific VPC IDs (default: all associated VPCs)') - parser.add_argument('--dry-run', action='store_true', - help='Show what would be disassociated without making changes') - parser.add_argument('--yes', action='store_true', - help='Non-interactive: disassociate all targeted VPCs without prompting') - parser.add_argument('--output-json', metavar='FILE', - help='Write results summary to a JSON file') - args = parser.parse_args() - - # --yes and --dry-run together is fine (dry-run takes precedence, no prompts needed) - interactive = not args.yes and not args.dry_run - - zone_id = args.zone_id.split('/')[-1] - client = get_client(args.profile, args.region) - account_id = get_account_id(args.profile) - details = get_zone_details(client, zone_id) - - vpcs = details['VPCs'] - if not vpcs: - print(f"Zone {zone_id} has no VPC associations. Nothing to do.") - sys.exit(0) - - # Filter to specific VPCs if requested - if args.vpc_ids: - requested = set(args.vpc_ids) - vpcs = [v for v in vpcs if v['VPCId'] in requested] - not_found = requested - {v['VPCId'] for v in vpcs} - if not_found: - print(f"WARNING: The following VPC IDs were not found associated to this zone: " - f"{', '.join(not_found)}", file=sys.stderr) - if not vpcs: - print("No matching VPCs found. Nothing to do.") - sys.exit(0) - - print_header(zone_id, details['Name'], account_id, vpcs) - - # Warn if only one association exists - if len(details['VPCs']) == 1 and not args.dry_run: - print("WARNING: This zone has only one VPC association. AWS will not allow you to") - print(" remove it without deleting the zone entirely.\n") - - results = run(zone_id, vpcs, dry_run=args.dry_run, interactive=interactive, client=client) - print_summary(results, dry_run=args.dry_run) - - if args.output_json: - output = { - 'ToolVersion': VERSION, - 'ZoneId': zone_id, - 'ZoneName': details['Name'], - 'AccountId': account_id, - 'DryRun': args.dry_run, - 'Results': results, - } - with open(args.output_json, 'w') as f: - json.dump(output, f, indent=2) - print(f"Results written to {args.output_json}") - - -if __name__ == '__main__': - main() diff --git a/local-app/python-tools/route53/example.txt b/local-app/python-tools/route53/example.txt deleted file mode 100644 index 168f85b5..00000000 --- a/local-app/python-tools/route53/example.txt +++ /dev/null @@ -1,99 +0,0 @@ -## List - -# Basic table output -python list_records.py --zone-id Z1234567890ABC - -# Use a specific AWS profile (source account) -python list_records.py --zone-id Z1234567890ABC --profile source-account - -# Export to JSON for use in migration script (step 2) -python list_records.py --zone-id Z1234567890ABC --profile source-account --output-json zone_records.json --summary - -# Export to CSV for review -python list_records.py --zone-id Z1234567890ABC --profile source-account --output-csv zone_records.csv --quiet - -# Accepts full ARN-style zone ID too -python list_records.py --zone-id /hostedzone/Z1234567890ABC - - -## Import - -# Step 1 — Export from source account -python list_records.py \ - --zone-id Z1234567890ABC \ - --profile source-account \ - --output-json zone_records.json \ - --summary - -# Step 2a — Dry run into a NEW zone in dest account -python import_records.py \ - --input-json zone_records.json \ - --zone-name example.com \ - --profile dest-account \ - --dry-run - -# Step 2b — Actually create zone and import -python import_records.py \ - --input-json zone_records.json \ - --zone-name example.com \ - --profile dest-account - -# Step 2c — Or import into an EXISTING zone -python import_records.py \ - --input-json zone_records.json \ - --zone-id Z0987654321XYZ \ - --profile dest-account \ - --action UPSERT # safe for re-runs - - -## Describe - -# Console output only -python describe_zone.py --zone-id Z1234567890ABC --profile source-account - -# Export to JSON (feeds nicely into an audit trail or migration ticket) -python describe_zone.py --zone-id Z1234567890ABC --profile source-account --output-json zone_info.json - -# Full migration prep — run all three in sequence -python describe_zone.py --zone-id Z1234567890ABC --profile source-account --output-json zone_info.json -python list_records.py --zone-id Z1234567890ABC --profile source-account --output-json zone_records.json --summary -python import_records.py --input-json zone_records.json --zone-name example.com --profile dest-account --dry-run - -## Disassociate - -# See what would be removed — no changes made -python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account --dry-run - -# Interactive — confirm each VPC one at a time (default mode) -python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account - -# Non-interactive — remove all associations without prompting -python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account --yes - -# Target specific VPCs only -python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account \ - --vpc-ids vpc-aaa111 vpc-bbb222 - -# Dry run with JSON output for change record / audit trail -python disassociate_vpcs.py --zone-id Z1234567890ABC --profile source-account \ - --dry-run --output-json disassoc_plan.json - - -## Delete - -# Always start with a dry run -python delete_zone.py --zone-id Z1234567890ABC --profile source-account --dry-run - -# Empty zone only — two confirmation prompts, backups written first -python delete_zone.py --zone-id Z1234567890ABC --profile source-account - -# Empty and delete — backups written, then two separate confirmations -python delete_zone.py --zone-id Z1234567890ABC --profile source-account --delete-zone - -# Non-interactive pipeline use -python delete_zone.py --zone-id Z1234567890ABC --profile source-account \ - --delete-zone --yes --output-dir ./backups/ - -# Dry run with backups going to a specific directory -python delete_zone.py --zone-id Z1234567890ABC --profile source-account \ - --dry-run --output-dir ./backups/ diff --git a/local-app/python-tools/route53/import_records.py b/local-app/python-tools/route53/import_records.py deleted file mode 100755 index 8af387b4..00000000 --- a/local-app/python-tools/route53/import_records.py +++ /dev/null @@ -1,250 +0,0 @@ -#!/usr/bin/env python3 -""" -import_records.py v1.0.0 - -Import resource records into a Route 53 hosted zone in a destination account. -Optionally creates the hosted zone if --zone-id is not provided. - -Usage: - # Create zone and import: - python import_records.py --input-json zone_records.json --zone-name example.com --profile dest-account - - # Import into existing zone: - python import_records.py --input-json zone_records.json --zone-id Z0987654321XYZ --profile dest-account -""" - -import argparse -import boto3 -import json -import sys -import time -import uuid -from typing import Optional - -VERSION = "1.0.0" - -# Record types that cannot be imported (AWS-managed or unsupported via API) -SKIP_TYPES = {'SOA', 'NS'} - -# Max records per change batch (Route 53 limit is 1000, but smaller batches are safer) -BATCH_SIZE = 100 - - -def get_client(profile: Optional[str], region: str): - session = boto3.Session(profile_name=profile, region_name=region) - return session.client('route53') - - -def create_hosted_zone(client, zone_name: str, private: bool, vpc_id: Optional[str], vpc_region: Optional[str], comment: str) -> str: - """Create a new hosted zone and return its ID.""" - if not zone_name.endswith('.'): - zone_name += '.' - - kwargs = { - 'Name': zone_name, - 'CallerReference': str(uuid.uuid4()), - 'HostedZoneConfig': { - 'Comment': comment, - 'PrivateZone': private - } - } - - if private: - if not vpc_id or not vpc_region: - print("ERROR: --vpc-id and --vpc-region are required for private hosted zones.", file=sys.stderr) - sys.exit(1) - kwargs['VPC'] = {'VPCRegion': vpc_region, 'VPCId': vpc_id} - - response = client.create_hosted_zone(**kwargs) - zone_id = response['HostedZone']['Id'].split('/')[-1] - print(f"Created hosted zone '{zone_name}' with ID: {zone_id}") - return zone_id - - -def load_records(input_file: str) -> tuple[str, list]: - """Load records from JSON export produced by list_records.py.""" - with open(input_file) as f: - data = json.load(f) - - records = data.get('ResourceRecordSets', []) - source_zone_id = data.get('HostedZoneId', 'unknown') - print(f"Loaded {len(records)} records from {input_file} (source zone: {source_zone_id})") - return source_zone_id, records - - -def filter_records(records: list, skip_types: set, skip_apex_ns: bool) -> tuple[list, list]: - """ - Split records into importable and skipped lists. - SOA and NS records at the zone apex are always skipped (AWS auto-creates them). - """ - importable = [] - skipped = [] - - for record in records: - rtype = record['Type'] - if rtype in skip_types: - skipped.append(record) - continue - importable.append(record) - - return importable, skipped - - -def build_change_batch(records: list, action: str = 'CREATE') -> dict: - """Build a Route 53 change batch from a list of resource record sets.""" - changes = [] - for record in records: - changes.append({ - 'Action': action, - 'ResourceRecordSet': record - }) - return {'Changes': changes} - - -def apply_change_batch(client, zone_id: str, batch: dict, dry_run: bool) -> Optional[str]: - """Submit a change batch to Route 53 and return the change ID.""" - if dry_run: - print(f" [DRY RUN] Would apply {len(batch['Changes'])} changes.") - return None - - response = client.change_resource_record_sets( - HostedZoneId=zone_id, - ChangeBatch=batch - ) - change_id = response['ChangeInfo']['Id'].split('/')[-1] - return change_id - - -def wait_for_change(client, change_id: str, timeout: int = 120): - """Poll until a Route 53 change batch reaches INSYNC status.""" - print(f" Waiting for change {change_id} to sync", end='', flush=True) - deadline = time.time() + timeout - while time.time() < deadline: - response = client.get_change(Id=change_id) - status = response['ChangeInfo']['Status'] - if status == 'INSYNC': - print(" done.") - return - print('.', end='', flush=True) - time.sleep(5) - print(f"\nWARNING: Change {change_id} did not reach INSYNC within {timeout}s.") - - -def import_records(client, zone_id: str, records: list, dry_run: bool, wait: bool, action: str = 'CREATE'): - """Chunk records into batches and apply them.""" - total = len(records) - print(f"\nImporting {total} records in batches of {BATCH_SIZE} (action={action}, dry_run={dry_run})...") - - for i in range(0, total, BATCH_SIZE): - chunk = records[i:i + BATCH_SIZE] - batch_num = (i // BATCH_SIZE) + 1 - print(f"\n Batch {batch_num}: {len(chunk)} records") - for r in chunk: - print(f" {r['Type']:<8} {r['Name']}") - - batch = build_change_batch(chunk, action=action) - change_id = apply_change_batch(client, zone_id, batch, dry_run) - - if change_id and wait: - wait_for_change(client, change_id) - elif change_id: - print(f" Change submitted: {change_id} (not waiting for INSYNC)") - - print(f"\nDone. {total} records processed.") - - -def print_skipped(skipped: list): - if not skipped: - return - print(f"\nSkipped {len(skipped)} records (SOA/NS — recreated automatically by AWS):") - for r in skipped: - print(f" {r['Type']:<8} {r['Name']}") - - -def main(): - parser = argparse.ArgumentParser( - description=f'Import Route 53 records into a destination account. v{VERSION}' - ) - - # Source - parser.add_argument('--input-json', required=True, - help='JSON file produced by list_records.py') - - # Destination zone — one of these is required - dest_group = parser.add_mutually_exclusive_group(required=True) - dest_group.add_argument('--zone-id', - help='Existing destination hosted zone ID') - dest_group.add_argument('--zone-name', - help='Create a new hosted zone with this name') - - # Zone creation options (only used with --zone-name) - parser.add_argument('--private', action='store_true', - help='Create a private hosted zone (requires --vpc-id and --vpc-region)') - parser.add_argument('--vpc-id', - help='VPC ID to associate with a private hosted zone') - parser.add_argument('--vpc-region', - help='VPC region for private hosted zone association') - parser.add_argument('--zone-comment', default='Migrated by import_records.py', - help='Comment for newly created hosted zone') - - # Behavior - parser.add_argument('--action', choices=['CREATE', 'UPSERT'], default='CREATE', - help='Change action: CREATE (default) or UPSERT (safe for re-runs)') - parser.add_argument('--include-ns', action='store_true', - help='Include apex NS records (not recommended — overwrites AWS-assigned NS)') - parser.add_argument('--dry-run', action='store_true', - help='Print what would be imported without making changes') - parser.add_argument('--no-wait', action='store_true', - help='Do not wait for INSYNC after each batch') - - # AWS auth - parser.add_argument('--profile', help='AWS profile for destination account') - parser.add_argument('--region', default='us-gov-east-1', help='AWS region (default: us-gov-east-1)') - - args = parser.parse_args() - - print(f"import_records.py v{VERSION}") - - client = get_client(args.profile, args.region) - - # Resolve destination zone ID - if args.zone_id: - zone_id = args.zone_id.split('/')[-1] - print(f"Using existing zone: {zone_id}") - else: - zone_id = create_hosted_zone( - client, - zone_name=args.zone_name, - private=args.private, - vpc_id=args.vpc_id, - vpc_region=args.vpc_region, - comment=args.zone_comment - ) - - # Load and filter records - source_zone_id, records = load_records(args.input_json) - - skip_types = set(SKIP_TYPES) - if args.include_ns: - skip_types.discard('NS') - - importable, skipped = filter_records(records, skip_types, skip_apex_ns=not args.include_ns) - print_skipped(skipped) - - if not importable: - print("No importable records found. Exiting.") - sys.exit(0) - - # Run the import - import_records( - client, - zone_id=zone_id, - records=importable, - dry_run=args.dry_run, - wait=not args.no_wait, - action=args.action - ) - - -if __name__ == '__main__': - main() diff --git a/local-app/python-tools/route53/list_records.py b/local-app/python-tools/route53/list_records.py deleted file mode 100755 index e9089d73..00000000 --- a/local-app/python-tools/route53/list_records.py +++ /dev/null @@ -1,158 +0,0 @@ -#!/usr/bin/env python3 -""" -list_records.py v1.0.1 - -List all resource records in a Route 53 hosted zone. -Changelog: - 1.0.1 - Added VERSION constant for consistency with migration toolset - 1.0.0 - Initial release - -Usage: python list_records.py --zone-id Z1234567890ABC [--profile myprofile] [--region us-gov-east-1] -""" - -import argparse -import boto3 -import json -import csv -import sys -from typing import Optional - -VERSION = "1.0.1" - - -def get_all_records(client, zone_id: str) -> list: - """Fetch all resource record sets from a hosted zone, handling pagination.""" - records = [] - paginator = client.get_paginator('list_resource_record_sets') - - try: - for page in paginator.paginate(HostedZoneId=zone_id): - records.extend(page['ResourceRecordSets']) - except client.exceptions.NoSuchHostedZone: - print(f"ERROR: Hosted zone '{zone_id}' not found.", file=sys.stderr) - sys.exit(1) - except Exception as e: - print(f"ERROR: {e}", file=sys.stderr) - sys.exit(1) - - return records - - -def format_record_value(record: dict) -> str: - """Extract a human-readable value from a resource record set.""" - if 'AliasTarget' in record: - alias = record['AliasTarget'] - return f"ALIAS -> {alias['DNSName']} (zone: {alias['HostedZoneId']}, evaluate-health: {alias['EvaluateTargetHealth']})" - - values = [rr['Value'] for rr in record.get('ResourceRecords', [])] - return ' | '.join(values) - - -def print_table(records: list, zone_id: str): - """Print records in a formatted table.""" - print(f"\nlist_records.py v{VERSION}") - print(f"Hosted Zone: {zone_id}") - print(f"Total Records: {len(records)}\n") - - col_widths = {'name': 50, 'type': 8, 'ttl': 8, 'value': 80} - header = ( - f"{'Name':<{col_widths['name']}} " - f"{'Type':<{col_widths['type']}} " - f"{'TTL':<{col_widths['ttl']}} " - f"Value" - ) - print(header) - print('-' * (sum(col_widths.values()) + 3)) - - for record in sorted(records, key=lambda r: (r['Name'], r['Type'])): - name = record['Name'] - rtype = record['Type'] - ttl = str(record.get('TTL', 'N/A')) - value = format_record_value(record) - - values = value.split(' | ') - print(f"{name:<{col_widths['name']}} {rtype:<{col_widths['type']}} {ttl:<{col_widths['ttl']}} {values[0]}") - for v in values[1:]: - print(f"{'':<{col_widths['name']}} {'':<{col_widths['type']}} {'':<{col_widths['ttl']}} {v}") - - -def export_json(records: list, output_file: str, zone_id: str): - """Export records to JSON.""" - data = { - 'HostedZoneId': zone_id, - 'RecordCount': len(records), - 'ResourceRecordSets': records - } - with open(output_file, 'w') as f: - json.dump(data, f, indent=2, default=str) - print(f"Exported {len(records)} records to {output_file}") - - -def export_csv(records: list, output_file: str): - """Export records to CSV.""" - with open(output_file, 'w', newline='') as f: - writer = csv.writer(f) - writer.writerow(['Name', 'Type', 'TTL', 'Value', 'IsAlias', 'AliasTarget', 'AliasZoneId']) - - for record in sorted(records, key=lambda r: (r['Name'], r['Type'])): - is_alias = 'AliasTarget' in record - alias_dns = record.get('AliasTarget', {}).get('DNSName', '') - alias_zone = record.get('AliasTarget', {}).get('HostedZoneId', '') - ttl = record.get('TTL', '') - - if is_alias: - writer.writerow([record['Name'], record['Type'], ttl, '', True, alias_dns, alias_zone]) - else: - values = [rr['Value'] for rr in record.get('ResourceRecords', [])] - for value in values: - writer.writerow([record['Name'], record['Type'], ttl, value, False, '', '']) - - print(f"Exported records to {output_file}") - - -def print_summary(records: list): - """Print a summary of record types.""" - type_counts = {} - for record in records: - rtype = record['Type'] - type_counts[rtype] = type_counts.get(rtype, 0) + 1 - - print("\nRecord Type Summary:") - print('-' * 20) - for rtype, count in sorted(type_counts.items()): - print(f" {rtype:<10} {count}") - print(f" {'TOTAL':<10} {len(records)}") - - -def main(): - parser = argparse.ArgumentParser(description=f'List all resource records in a Route 53 hosted zone. v{VERSION}') - parser.add_argument('--zone-id', required=True, help='Hosted Zone ID (e.g., Z1234567890ABC)') - parser.add_argument('--profile', help='AWS profile name') - parser.add_argument('--region', default='us-gov-east-1', help='AWS region (default: us-gov-east-1)') - parser.add_argument('--output-json', metavar='FILE', help='Export records to JSON file') - parser.add_argument('--output-csv', metavar='FILE', help='Export records to CSV file') - parser.add_argument('--summary', action='store_true', help='Print record type summary') - parser.add_argument('--quiet', action='store_true', help='Suppress table output (useful with --output-*)') - args = parser.parse_args() - - session = boto3.Session(profile_name=args.profile, region_name=args.region) - client = session.client('route53') - - zone_id = args.zone_id.split('/')[-1] - records = get_all_records(client, zone_id) - - if not args.quiet: - print_table(records, zone_id) - - if args.summary or args.quiet: - print_summary(records) - - if args.output_json: - export_json(records, args.output_json, zone_id) - - if args.output_csv: - export_csv(records, args.output_csv) - - -if __name__ == '__main__': - main()