From bff2c4f3abecaa08b33f16ef92d348b5d86ba5f0 Mon Sep 17 00:00:00 2001 From: badra001 Date: Wed, 7 Jan 2026 12:17:18 -0500 Subject: [PATCH] split buckets and progress bars --- .../cross-organization/archive-cloudtrail.py | 217 ++++++++---------- 1 file changed, 102 insertions(+), 115 deletions(-) diff --git a/local-app/python-tools/cross-organization/archive-cloudtrail.py b/local-app/python-tools/cross-organization/archive-cloudtrail.py index 18c1bad9..8c9a122b 100755 --- a/local-app/python-tools/cross-organization/archive-cloudtrail.py +++ b/local-app/python-tools/cross-organization/archive-cloudtrail.py @@ -6,29 +6,48 @@ import time import argparse import concurrent.futures +import threading +import random from tqdm import tqdm -from botocore.exceptions import ClientError +from botocore.exceptions import ClientError, BotoCoreError -__version__ = "1.0.5" +__version__ = "1.0.7" def get_args(): parser = argparse.ArgumentParser(description=f"S3 Org Migrator v{__version__}") - parser.add_argument("--profile", default="default", help="Initial AWS profile to use") - parser.add_argument("--region", required=True, help="Initial region for the master STS connection") - parser.add_argument("--role-name", required=True, help="Role name to assume in member accounts") - parser.add_argument("--threads", type=int, default=16, help="Max threads for copying (default: 16)") - parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without copying data") - parser.add_argument("--input", default="migration_data.csv", help="Path to input CSV file") + parser.add_argument("--profile", default="default", help="Initial AWS profile") + parser.add_argument("--region", required=True, help="Initial region for master connection") + parser.add_argument("--role-name", required=True, help="Role name to assume across accounts") + parser.add_argument("--threads", type=int, default=16, help="Concurrent bucket migrations (default: 16)") + parser.add_argument("--dry-run", action="store_true", help="Dry run mode") + parser.add_argument("--input", default="migration_data.csv", help="Input CSV file") return parser.parse_args() +def format_size(bytes): + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if bytes < 1024.0: return f"{bytes:.2f} {unit}" + bytes /= 1024.0 + +class DisplayManager: + """Manages vertical terminal slots for concurrent tqdm bars.""" + def __init__(self, max_slots): + self.slots = list(range(1, max_slots + 1)) + self.lock = threading.Lock() + + def acquire_slot(self): + with self.lock: + return self.slots.pop(0) if self.slots else None + + def release_slot(self, slot): + if slot is not None: + with self.lock: + self.slots.append(slot) + self.slots.sort() + def get_assumed_session(sts_master, partition, account_id, region, role_name): - """Assume role into member account using the master identity.""" try: role_arn = f"arn:{partition}:iam::{account_id}:role/{role_name}" - res = sts_master.assume_role( - RoleArn=role_arn, - RoleSessionName="S3MigrationWorker" - ) + res = sts_master.assume_role(RoleArn=role_arn, RoleSessionName="S3MigratorV107") c = res['Credentials'] return boto3.Session( aws_access_key_id=c['AccessKeyId'], @@ -36,145 +55,113 @@ def get_assumed_session(sts_master, partition, account_id, region, role_name): aws_session_token=c['SessionToken'], region_name=region ) - except Exception as e: - return None - -def copy_worker(dest_s3, s_bucket, s_key, d_bucket, d_key, pbar, dry_run): - """Execution worker for individual object copies.""" - if dry_run: - pbar.update(1) - return True - try: - # Server-side copy: Data moves within AWS network - dest_s3.copy({'Bucket': s_bucket, 'Key': s_key}, d_bucket, d_key) - pbar.update(1) - return True - except Exception: - return False - -def process_bucket(row, sts_master, partition, args, overall_pbar): + except Exception: return None + +def s3_copy_with_backoff(dest_s3, copy_source, d_bucket, d_key, max_retries=5): + """Executes S3 copy with exponential backoff for 503 SlowDown errors.""" + for attempt in range(max_retries): + try: + dest_s3.copy(copy_source, d_bucket, d_key) + return True + except ClientError as e: + code = e.response.get('Error', {}).get('Code') + if code == 'SlowDown' and attempt < max_retries - 1: + # Exponential backoff with jitter + sleep_time = (2 ** attempt) + random.random() + time.sleep(sleep_time) + continue + return False + except (BotoCoreError, Exception): + return False + +def process_bucket(row, sts_master, partition, args, overall_pbar, display_manager): + slot = display_manager.acquire_slot() start_time = time.perf_counter() + s_acc, s_reg, s_bucket = row['source_account_id'], row['source_region'], row['source_bucket'] + d_acc, d_reg, d_bucket = row['destination_account_id'], row['destination_region'], row['destination_bucket'] - # Mapping based on the new CSV order - s_acc = row['source_account_id'] - s_reg = row['source_region'] - s_bucket = row['source_bucket'] - d_acc = row['destination_account_id'] - d_reg = row['destination_region'] - d_bucket = row['destination_bucket'] - - # Path Logic: Default to archive/source_account_id if blank - d_path = row.get('destination_path', '').strip() or f"archive/{s_acc}" - d_path = d_path.rstrip('/') + '/' + d_path = (row.get('destination_path', '').strip() or f"archive/{s_acc}").rstrip('/') + '/' - # Establish Sessions src_session = get_assumed_session(sts_master, partition, s_acc, s_reg, args.role_name) dest_session = get_assumed_session(sts_master, partition, d_acc, d_reg, args.role_name) if not src_session or not dest_session: - overall_pbar.write(f"ERROR: Could not assume roles for Acc {s_acc}") overall_pbar.update(1) + display_manager.release_slot(slot) return - src_s3 = src_session.client('s3') - dest_s3 = dest_session.client('s3') - - # 1. Marker Check + src_s3, dest_s3 = src_session.client('s3'), dest_session.client('s3') MARKER_FILE = "archived" + try: src_s3.head_object(Bucket=s_bucket, Key=MARKER_FILE) - overall_pbar.write(f"SKIP: {s_bucket} in {s_reg} already archived.") overall_pbar.update(1) + display_manager.release_slot(slot) return - except ClientError: - pass + except ClientError: pass - # 2. Inventory - objects = [] - total_bytes = 0 + # Inventory + objects, total_bytes = [], 0 paginator = src_s3.get_paginator('list_objects_v2') - try: - for page in paginator.paginate(Bucket=s_bucket): - for obj in page.get('Contents', []): - if obj['Key'] != MARKER_FILE: - objects.append(obj) - total_bytes += obj['Size'] - except Exception as e: - overall_pbar.write(f"ERROR: Failed to list {s_bucket}: {e}") - overall_pbar.update(1) - return + for page in paginator.paginate(Bucket=s_bucket): + for obj in page.get('Contents', []): + if obj['Key'] != MARKER_FILE: + objects.append(obj) + total_bytes += obj['Size'] if not objects: overall_pbar.update(1) + display_manager.release_slot(slot) return - # 3. High-Performance Multi-threaded Copy - desc = f"Acc {s_acc[-4:]} ({s_reg})" - with tqdm(total=len(objects), desc=desc, unit="obj", leave=False, position=1) as pbar: - with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: - futures = [] - for obj in objects: - target_key = f"{d_path}{obj['Key']}" - futures.append(executor.submit( - copy_worker, dest_s3, s_bucket, obj['Key'], d_bucket, target_key, pbar, args.dry_run - )) - - results = [f.result() for f in concurrent.futures.as_completed(futures)] - - # 4. Finalize & JSON Reporting + # Visual Bar for this specific bucket migration + desc = f"Bucket: {s_bucket[:15]}... ({format_size(total_bytes)})" + with tqdm(total=len(objects), desc=desc, unit="obj", leave=False, position=slot) as pbar: + results = [] + for obj in objects: + if not args.dry_run: + res = s3_copy_with_backoff(dest_s3, {'Bucket': s_bucket, 'Key': obj['Key']}, d_bucket, f"{d_path}{obj['Key']}") + results.append(res) + else: + results.append(True) + pbar.update(1) + + # Finalize if all(results) and not args.dry_run: elapsed = time.perf_counter() - start_time - src_s3.put_object(Bucket=s_bucket, Key=MARKER_FILE, Body="migrated") - + src_s3.put_object(Bucket=s_bucket, Key=MARKER_FILE, Body="archived") report = { - "source": {"account": s_acc, "bucket": s_bucket, "region": s_reg}, - "destination": {"account": d_acc, "bucket": d_bucket, "region": d_reg, "path": d_path}, - "stats": { - "objects": len(objects), - "bytes": total_bytes, - "seconds": round(elapsed, 2), - "mb_per_sec": round((total_bytes/1e6)/elapsed, 2) if elapsed > 0 else 0 - }, - "version": __version__ + "version": __version__, "source_account": s_acc, "size": format_size(total_bytes), + "objects": len(objects), "seconds": round(elapsed, 2) } with open(f"migration_{s_acc}_{s_reg}.json", "w") as f: json.dump(report, f, indent=4) - - elif args.dry_run: - overall_pbar.write(f"DRY RUN: {s_bucket} ({s_reg}) -> {len(objects)} objects found.") overall_pbar.update(1) + display_manager.release_slot(slot) def main(): args = get_args() - - # 1. Initialize Master Session + print("="*80) + print(f"S3 ORG MIGRATOR v{__version__} | Region: {args.region} | Threads: {args.threads}") + print(f"Role: {args.role_name} | Input: {args.input} | Dry Run: {args.dry_run}") + print("="*80) + master_session = boto3.Session(profile_name=args.profile, region_name=args.region) sts_master = master_session.client('sts') - - # 2. Identify Partition - try: - identity = sts_master.get_caller_identity() - partition = identity['Arn'].split(':')[1] - except Exception as e: - print(f"FATAL: Could not verify master identity: {e}") - return - - print(f"--- S3 Org Migrator v{__version__} ---") - print(f"Partition: {partition} | Profile: {args.profile} | Region: {args.region}") - if args.dry_run: print("!!! DRY RUN ENABLED !!!") + partition = sts_master.get_caller_identity()['Arn'].split(':')[1] - # 3. Process Input - try: - with open(args.input, 'r') as f: - rows = list(csv.DictReader(f)) - except FileNotFoundError: - print(f"FATAL: Input file '{args.input}' not found.") - return + display_manager = DisplayManager(max_slots=args.threads) + + with open(args.input, 'r') as f: + rows = list(csv.DictReader(f)) + + with tqdm(total=len(rows), desc="OVERALL PROGRESS", position=0) as overall_pbar: + with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: + futures = [executor.submit(process_bucket, row, sts_master, partition, args, overall_pbar, display_manager) for row in rows] + concurrent.futures.wait(futures) - with tqdm(total=len(rows), desc="Overall Progress", position=0) as overall_pbar: - for row in rows: - process_bucket(row, sts_master, partition, args, overall_pbar) + print("\n" * (args.threads + 1) + "Migration Task Complete.") if __name__ == "__main__": main()