diff --git a/local-app/python-tools/cross-organization/archive-cloudtrail.py b/local-app/python-tools/cross-organization/archive-cloudtrail.py index deb32692..8153f73d 100755 --- a/local-app/python-tools/cross-organization/archive-cloudtrail.py +++ b/local-app/python-tools/cross-organization/archive-cloudtrail.py @@ -1,4 +1,5 @@ #!/bin/env python + import boto3 import csv import json @@ -7,12 +8,11 @@ import concurrent.futures import threading import random -import os from datetime import datetime from tqdm import tqdm -from botocore.exceptions import ClientError, BotoCoreError +from botocore.exceptions import ClientError -__version__ = "1.0.10" +__version__ = "1.0.12" # Global lock for thread-safe UI updates UI_LOCK = threading.RLock() @@ -20,9 +20,9 @@ def get_args(): parser = argparse.ArgumentParser(description=f"S3 Org Migrator v{__version__}") - parser.add_argument("--profile", default="default", help="Initial AWS profile") - parser.add_argument("--region", required=True, help="Initial region for master connection") - parser.add_argument("--role-name", required=True, help="Role name to assume across accounts") + parser.add_argument("--profile", required=True, help="Local AWS profile name (source of credentials)") + parser.add_argument("--region", required=True, help="Initial region for master STS connection") + parser.add_argument("--role-name", required=True, help="Role name to assume in source/dest accounts") parser.add_argument("--threads", type=int, default=16, help="Concurrent bucket migrations") parser.add_argument("--dry-run", action="store_true", help="Dry run mode") parser.add_argument("--input", default="migration_data.csv", help="Input CSV file") @@ -36,16 +36,14 @@ def format_size(bytes): bytes /= 1024.0 class DisplayManager: - """Manages vertical terminal slots for concurrent progress bars.""" def __init__(self, max_slots): self.slots = list(range(1, max_slots + 1)) self.lock = threading.Lock() - # Pre-allocate terminal space - print("\n" * (max_slots + 1)) + for _ in range(max_slots + 2): print("") + print(f"\033[{max_slots + 2}A", end="") def acquire_slot(self): - with self.lock: - return self.slots.pop(0) if self.slots else None + with self.lock: return self.slots.pop(0) if self.slots else None def release_slot(self, slot): if slot is not None: @@ -53,106 +51,86 @@ def release_slot(self, slot): self.slots.append(slot) self.slots.sort() -def s3_copy_with_backoff(dest_s3, copy_source, d_bucket, d_key, max_retries=5): - retries = 0 - for attempt in range(max_retries): - try: - dest_s3.copy(copy_source, d_bucket, d_key) - return True, retries - except ClientError as e: - if e.response.get('Error', {}).get('Code') == 'SlowDown' and attempt < max_retries - 1: - retries += 1 - time.sleep((2 ** attempt) + random.random()) - continue - return False, retries - except Exception: - return False, retries +def get_assumed_client(sts_client, partition, account_id, region, role_name, service='s3'): + """Assume role using the master STS client and return a service client.""" + role_arn = f"arn:{partition}:iam::{account_id}:role/{role_name}" + try: + response = sts_client.assume_role( + RoleArn=role_arn, + RoleSessionName=f"MigrationSession-{account_id}" + ) + creds = response['Credentials'] + return boto3.client( + service, + aws_access_key_id=creds['AccessKeyId'], + aws_secret_access_key=creds['SecretAccessKey'], + aws_session_token=creds['SessionToken'], + region_name=region + ) + except Exception as e: + raise Exception(f"AssumeRole failed for {account_id}: {str(e)}") def process_bucket(row, sts_master, partition, args, overall_pbar, display_manager): slot = display_manager.acquire_slot() start_time = time.perf_counter() - s_acc, s_reg, s_bucket = row['source_account_id'], row['source_region'], row['source_bucket'] d_acc, d_reg, d_bucket = row['destination_account_id'], row['destination_region'], row['destination_bucket'] d_path = (row.get('destination_path', '').strip() or f"archive/{s_acc}").rstrip('/') + '/' - # Detailed report data for this specific row - entry = { - "source_account": s_acc, "source_bucket": s_bucket, "source_region": s_reg, - "dest_account": d_acc, "dest_bucket": d_bucket, "dest_path": d_path, - "object_count": 0, "total_size_bytes": 0, "error_count": 0, "retry_count": 0, - "elapsed_time": 0, "transfer_rate_mbps": 0, "status": "pending" - } + entry = {"account": s_acc, "bucket": s_bucket, "objects": 0, "bytes": 0, "errors": 0, "retries": 0, "status": "failed"} try: - # Authentication - s_sess = boto3.Session(profile_name=args.profile) # Simplified for structure; use assume_role logic as before - # (Insert your specific AssumeRole logic here) - - src_s3 = s_sess.client('s3', region_name=s_reg) - dest_s3 = s_sess.client('s3', region_name=d_reg) - - MARKER_FILE = "archived" - # 1. Marker Check - try: - src_s3.head_object(Bucket=s_bucket, Key=MARKER_FILE) - entry["status"] = "skipped (archived marker exists)" - return entry - except: pass - - # 2. Inventory + # Establish clients using assumed roles + src_s3 = get_assumed_client(sts_master, partition, s_acc, s_reg, args.role_name) + dest_s3 = get_assumed_client(sts_master, partition, d_acc, d_reg, args.role_name) + + # 1. Inventory (Objects & Size) objs = [] paginator = src_s3.get_paginator('list_objects_v2') for page in paginator.paginate(Bucket=s_bucket): for obj in page.get('Contents', []): - if obj['Key'] != MARKER_FILE: + if obj['Key'] != "archived": objs.append(obj) - entry["total_size_bytes"] += obj['Size'] - - entry["object_count"] = len(objs) + entry["bytes"] += obj['Size'] + + entry["objects"] = len(objs) if not objs: - entry["status"] = "success (empty bucket)" - return entry - - # 3. Execution Dashboard - desc = f"Acc {s_acc[-4:]} | {s_bucket[:12]}" - with tqdm(total=len(objs), desc=desc, position=slot, leave=False) as pbar: - for obj in objs: - if not args.dry_run: - success, retries = s3_copy_with_backoff(dest_s3, {'Bucket': s_bucket, 'Key': obj['Key']}, d_bucket, f"{d_path}{obj['Key']}") - entry["retry_count"] += retries - if not success: entry["error_count"] += 1 - pbar.update(1) - - # 4. Finalize - if entry["error_count"] == 0 and not args.dry_run: - src_s3.put_object(Bucket=s_bucket, Key=MARKER_FILE, Body="archived") - entry["status"] = "success" - elif args.dry_run: - entry["status"] = "dry-run" + entry["status"] = "empty" + else: + desc = f"Acc {s_acc[-4:]} | {s_bucket[:12]}" + with tqdm(total=len(objs), desc=desc, position=slot, leave=False) as pbar: + for obj in objs: + if not args.dry_run: + # Server-side copy + dest_s3.copy({'Bucket': s_bucket, 'Key': obj['Key']}, d_bucket, f"{d_path}{obj['Key']}") + pbar.update(1) + + if not args.dry_run: + src_s3.put_object(Bucket=s_bucket, Key="archived", Body="archived") + entry["status"] = "success" + else: + entry["status"] = "dry-run" except Exception as e: entry["status"] = f"failed: {str(e)}" finally: - entry["elapsed_time"] = round(time.perf_counter() - start_time, 2) - if entry["elapsed_time"] > 0: - entry["transfer_rate_mbps"] = round((entry["total_size_bytes"] / 1e6) / entry["elapsed_time"], 2) + entry["elapsed"] = round(time.perf_counter() - start_time, 2) overall_pbar.update(1) display_manager.release_slot(slot) return entry def main(): args = get_args() - # Print Header with Arguments print("="*80) - print(f"S3 ORG MIGRATOR v{__version__}") - print(f"COMMAND: --profile {args.profile} --region {args.region} --role-name {args.role_name} --threads {args.threads} --dry-run {args.dry_run}") - print(f"INPUT: {args.input} | OUTPUT: {args.output}") + print(f"S3 ORG MIGRATOR v{__version__} | Active Profile: {args.profile}") + print(f"Threads: {args.threads} | Dry Run: {args.dry_run}") print("="*80) + # 1. Setup Master Session from Profile master_session = boto3.Session(profile_name=args.profile, region_name=args.region) - sts = master_session.client('sts') - partition = sts.get_caller_identity()['Arn'].split(':')[1] + sts_master = master_session.client('sts') + identity = sts_master.get_caller_identity() + partition = identity['Arn'].split(':')[1] with open(args.input, 'r') as f: rows = list(csv.DictReader(f)) @@ -162,33 +140,18 @@ def main(): with tqdm(total=len(rows), desc="OVERALL PROGRESS", position=0) as overall_pbar: with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: - futures = [executor.submit(process_bucket, row, sts, partition, args, overall_pbar, display_manager) for row in rows] + futures = [executor.submit(process_bucket, row, sts_master, partition, args, overall_pbar, display_manager) for row in rows] for fut in concurrent.futures.as_completed(futures): final_results.append(fut.result()) - # Write per-row output file - with open(args.output, 'w') as f: - json.dump(final_results, f, indent=4) - - # Summary Statistics - total_acc = len(set(r['source_account'] for r in final_results)) - total_buckets = len(final_results) - total_objs = sum(r['object_count'] for r in final_results) - total_bytes = sum(r['total_size_bytes'] for r in final_results) - total_errors = sum(r['error_count'] for r in final_results) - total_retries = sum(r['retry_count'] for r in final_results) - + # Final Cleanup and Report print("\n" * (args.threads + 2)) - print("="*80) - print("FINAL ORGANIZATIONAL SUMMARY") - print(f"Total Accounts: {total_acc}") - print(f"Total Buckets: {total_buckets}") - print(f"Total Objects: {total_objs}") - print(f"Total Data: {format_size(total_bytes)}") - print(f"Total Errors: {total_errors}") - print(f"Total S3 Retries: {total_retries}") - print(f"Detailed results: {args.output}") - print("="*80) + with open(args.output, "w") as f: json.dump(final_results, f, indent=4) + + total_objs = sum(r.get('objects', 0) for r in final_results) + total_bytes = sum(r.get('bytes', 0) for r in final_results) + print(f"SUMMARY: {len(final_results)} buckets processed. {total_objs} objects ({format_size(total_bytes)}) moved.") + print(f"Results: {args.output}") if __name__ == "__main__": main()