From dd925ae34edfe194f9cb6d819766a43c99d093c9 Mon Sep 17 00:00:00 2001 From: badra001 Date: Wed, 7 Jan 2026 12:22:12 -0500 Subject: [PATCH] add output --- .../cross-organization/archive-cloudtrail.py | 147 +++++++++++++----- 1 file changed, 109 insertions(+), 38 deletions(-) diff --git a/local-app/python-tools/cross-organization/archive-cloudtrail.py b/local-app/python-tools/cross-organization/archive-cloudtrail.py index c2f47f37..b91a1eea 100755 --- a/local-app/python-tools/cross-organization/archive-cloudtrail.py +++ b/local-app/python-tools/cross-organization/archive-cloudtrail.py @@ -8,23 +8,26 @@ import concurrent.futures import threading import random -import sys +import os +from datetime import datetime from tqdm import tqdm from botocore.exceptions import ClientError, BotoCoreError -__version__ = "1.0.8" +__version__ = "1.0.9" -# Global lock for thread-safe terminal writing -Tqdm_Lock = threading.Lock() +# Global lock for thread-safe UI updates +UI_LOCK = threading.Lock() def get_args(): parser = argparse.ArgumentParser(description=f"S3 Org Migrator v{__version__}") parser.add_argument("--profile", default="default", help="Initial AWS profile") parser.add_argument("--region", required=True, help="Initial region for master connection") parser.add_argument("--role-name", required=True, help="Role name to assume across accounts") - parser.add_argument("--threads", type=int, default=16, help="Concurrent bucket migrations (default: 16)") + parser.add_argument("--threads", type=int, default=16, help="Concurrent bucket migrations (slots)") parser.add_argument("--dry-run", action="store_true", help="Dry run mode") parser.add_argument("--input", default="migration_data.csv", help="Input CSV file") + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + parser.add_argument("--output", default=f"archive_results_{ts}.json", help="Summary output file") return parser.parse_args() def format_size(bytes): @@ -33,12 +36,12 @@ def format_size(bytes): bytes /= 1024.0 class DisplayManager: - """Manages vertical terminal slots to prevent bars from overwriting each other.""" + """Manages vertical terminal slots to ensure bars stay on their assigned lines.""" def __init__(self, max_slots): self.slots = list(range(1, max_slots + 1)) self.lock = threading.Lock() - # Pre-warm the terminal: print empty lines so tqdm has 'real estate' - print("\n" * max_slots) + # Pre-allocate lines on the terminal + print("\n" * (max_slots + 1)) def acquire_slot(self): with self.lock: @@ -51,74 +54,142 @@ def release_slot(self, slot): self.slots.sort() def s3_copy_with_backoff(dest_s3, copy_source, d_bucket, d_key, max_retries=5): + retries = 0 for attempt in range(max_retries): try: dest_s3.copy(copy_source, d_bucket, d_key) - return True + return True, retries except ClientError as e: - if e.response.get('Error', {}).get('Code') == 'SlowDown' and attempt < max_retries - 1: + code = e.response.get('Error', {}).get('Code') + if code == 'SlowDown' and attempt < max_retries - 1: + retries += 1 time.sleep((2 ** attempt) + random.random()) continue - return False - except: return False + return False, retries + except Exception: + return False, retries def process_bucket(row, sts_master, partition, args, overall_pbar, display_manager): slot = display_manager.acquire_slot() - s_acc, s_reg, s_bucket = row['source_account_id'], row['source_region'], row['source_bucket'] - d_acc, d_reg, d_bucket = row['destination_account_id'], row['destination_region'], row['destination_bucket'] + start_time = time.perf_counter() + + s_acc = row['source_account_id'] + s_reg = row['source_region'] + s_bucket = row['source_bucket'] + d_acc = row['destination_account_id'] + d_reg = row['destination_region'] + d_bucket = row['destination_bucket'] d_path = (row.get('destination_path', '').strip() or f"archive/{s_acc}").rstrip('/') + '/' - # Simple logic to simulate or execute + result_entry = { + "source": s_bucket, "account": s_acc, "objects": 0, "bytes": 0, + "errors": 0, "retries": 0, "status": "failed", "elapsed": 0 + } + try: - # Auth (Assuming role logic remains the same as v1.0.7) + # Assume Role role_arn = f"arn:{partition}:iam::{s_acc}:role/{args.role_name}" - res = sts_master.assume_role(RoleArn=role_arn, RoleSessionName="S3MigratorV108") - creds = res['Credentials'] - src_s3 = boto3.Session(creds['AccessKeyId'], creds['SecretAccessKey'], creds['SessionToken'], region_name=s_reg).client('s3') + res = sts_master.assume_role(RoleArn=role_arn, RoleSessionName=f"Migrator-{s_acc}") + c = res['Credentials'] + src_s3 = boto3.Session(c['AccessKeyId'], c['SecretAccessKey'], c['SessionToken'], region_name=s_reg).client('s3') - # Pull Inventory + dest_role_arn = f"arn:{partition}:iam::{d_acc}:role/{args.role_name}" + res_d = sts_master.assume_role(RoleArn=dest_role_arn, RoleSessionName=f"Migrator-Dest-{d_acc}") + cd = res_d['Credentials'] + dest_s3 = boto3.Session(cd['AccessKeyId'], cd['SecretAccessKey'], cd['SessionToken'], region_name=d_reg).client('s3') + + # Marker Check + MARKER_FILE = "archived" + try: + src_s3.head_object(Bucket=s_bucket, Key=MARKER_FILE) + result_entry["status"] = "skipped (marker)" + return result_entry + except: pass + + # Inventory paginator = src_s3.get_paginator('list_objects_v2') objects = [] for page in paginator.paginate(Bucket=s_bucket): - objects.extend(page.get('Contents', [])) + for obj in page.get('Contents', []): + if obj['Key'] != MARKER_FILE: + objects.append(obj) + result_entry["bytes"] += obj['Size'] + + if not objects: + result_entry["status"] = "empty" + return result_entry + + result_entry["objects"] = len(objects) + desc = f"Acc {s_acc[-4:]} | {s_bucket[:10]}" - # Dashboard Bar - desc = f"Bucket: {s_bucket[:12]}" - with tqdm(total=len(objects), desc=desc, unit="obj", position=slot, leave=False, lock=Tqdm_Lock) as pbar: + with tqdm(total=len(objects), desc=desc, position=slot, leave=False, lock=UI_LOCK) as pbar: for obj in objects: if not args.dry_run: - # In a real run, you'd use the dest_s3 client here - time.sleep(0.01) # Simulating API call + success, retries = s3_copy_with_backoff(dest_s3, {'Bucket': s_bucket, 'Key': obj['Key']}, d_bucket, f"{d_path}{obj['Key']}") + result_entry["retries"] += retries + if not success: result_entry["errors"] += 1 pbar.update(1) - + + if result_entry["errors"] == 0 and not args.dry_run: + src_s3.put_object(Bucket=s_bucket, Key=MARKER_FILE, Body="archived") + result_entry["status"] = "success" + elif args.dry_run: + result_entry["status"] = "dry-run" + + except Exception as e: + result_entry["status"] = f"error: {str(e)}" finally: + result_entry["elapsed"] = round(time.perf_counter() - start_time, 2) overall_pbar.update(1) display_manager.release_slot(slot) + return result_entry def main(): args = get_args() + print("="*80) + print(f"S3 ORG MIGRATOR v{__version__} | Region: {args.region} | Threads: {args.threads}") + print(f"Role: {args.role_name} | Dry Run: {args.dry_run} | Output: {args.output}") + print("="*80) + master_session = boto3.Session(profile_name=args.profile, region_name=args.region) sts_master = master_session.client('sts') partition = sts_master.get_caller_identity()['Arn'].split(':')[1] - # Header - print(f"--- S3 Org Migrator v{__version__} ---") - print(f"Role: {args.role_name} | Threads: {args.threads}") - with open(args.input, 'r') as f: rows = list(csv.DictReader(f)) display_manager = DisplayManager(max_slots=args.threads) + results = [] - # Position 0 is reserved for the Overall Progress bar - with tqdm(total=len(rows), desc="OVERALL", position=0, lock=Tqdm_Lock) as overall_pbar: + with tqdm(total=len(rows), desc="OVERALL", position=0, lock=UI_LOCK) as overall_pbar: with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: futures = [executor.submit(process_bucket, row, sts_master, partition, args, overall_pbar, display_manager) for row in rows] - concurrent.futures.wait(futures) - - # Clean exit: move cursor past the dashboard area - print("\n" * (args.threads + 1)) - print("All tasks completed.") + for fut in concurrent.futures.as_completed(futures): + results.append(fut.result()) + + # Generate Output File + with open(args.output, "w") as f: + json.dump(results, f, indent=4) + + # Calculate Summary + total_acc = len(set(r['account'] for r in results)) + total_buckets = len(results) + total_objs = sum(r['objects'] for r in results) + total_bytes = sum(r['bytes'] for r in results) + total_errors = sum(r['errors'] for r in results) + total_retries = sum(r['retries'] for r in results) + + print("\n" * (args.threads + 2)) + print("="*80) + print("MIGRATION SUMMARY") + print(f"Accounts Processed: {total_acc}") + print(f"Buckets Processed: {total_buckets}") + print(f"Objects Processed: {total_objs}") + print(f"Total Data Moved: {format_size(total_bytes)}") + print(f"Total Errors: {total_errors}") + print(f"Total S3 Retries: {total_retries}") + print(f"Full results saved to: {args.output}") + print("="*80) if __name__ == "__main__": main()