diff --git a/local-app/python-tools/cross-organization/archive-cloudtrail.py b/local-app/python-tools/cross-organization/archive-cloudtrail.py new file mode 100755 index 00000000..4d52be2d --- /dev/null +++ b/local-app/python-tools/cross-organization/archive-cloudtrail.py @@ -0,0 +1,164 @@ +#!/bin/env python + +import boto3 +import csv +import json +import time +import argparse +import concurrent.futures +from tqdm import tqdm +from botocore.exceptions import ClientError + +__version__ = "1.0.5" + +def get_args(): + parser = argparse.ArgumentParser(description=f"S3 Org Migrator v{__version__}") + parser.add_argument("--profile", default="default", help="Initial AWS profile to use") + parser.add_argument("--region", required=True, help="Initial region for the master STS/S3 connection") + parser.add_argument("--role-name", required=True, help="Role name to assume in member accounts") + parser.add_argument("--threads", type=int, default=16, help="Max threads for copying (default: 16)") + parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without copying data") + parser.add_argument("--csv", default="migration_data.csv", help="Path to input CSV file") + return parser.parse_args() + +def get_assumed_session(sts_master, partition, account_id, region, role_name): + """Assume role into member account using the master identity.""" + try: + role_arn = f"arn:{partition}:iam::{account_id}:role/{role_name}" + res = sts_master.assume_role( + RoleArn=role_arn, + RoleSessionName="S3MigrationWorker" + ) + c = res['Credentials'] + return boto3.Session( + aws_access_key_id=c['AccessKeyId'], + aws_secret_access_key=c['SecretAccessKey'], + aws_session_token=c['SessionToken'], + region_name=region + ) + except Exception as e: + return None + +def copy_worker(dest_s3, s_bucket, s_key, d_bucket, d_key, pbar, dry_run): + """Execution worker for individual object copies.""" + if dry_run: + pbar.update(1) + return True + try: + # Server-side copy: S3 handles data transfer directly + dest_s3.copy({'Bucket': s_bucket, 'Key': s_key}, d_bucket, d_key) + pbar.update(1) + return True + except Exception: + return False + +def process_bucket(row, sts_master, partition, args, overall_pbar): + start_time = time.perf_counter() + s_acc = row['source_account_id'] + s_bucket = row['source_bucket_name'] + s_reg = row['source_region'] + d_acc = row['destination_account_id'] + d_bucket = row['destination_bucket_name'] + d_reg = row['destination_region'] + + # Path Logic: Default to archive/source_account_id if blank + d_path = row.get('destination_path', '').strip() or f"archive/{s_acc}" + d_path = d_path.rstrip('/') + '/' + + # Establish Sessions + src_session = get_assumed_session(sts_master, partition, s_acc, s_reg, args.role_name) + dest_session = get_assumed_session(sts_master, partition, d_acc, d_reg, args.role_name) + + if not src_session or not dest_session: + overall_pbar.write(f"ERROR: Could not assume roles for Acc {s_acc}") + overall_pbar.update(1) + return + + src_s3 = src_session.client('s3') + dest_s3 = dest_session.client('s3') + + # 1. Marker Check + MARKER_FILE = "archived" + try: + src_s3.head_object(Bucket=s_bucket, Key=MARKER_FILE) + overall_pbar.write(f"SKIP: {s_bucket} already archived.") + overall_pbar.update(1) + return + except ClientError: + pass + + # 2. Inventory + objects = [] + total_bytes = 0 + paginator = src_s3.get_paginator('list_objects_v2') + for page in paginator.paginate(Bucket=s_bucket): + for obj in page.get('Contents', []): + if obj['Key'] != MARKER_FILE: + objects.append(obj) + total_bytes += obj['Size'] + + if not objects: + overall_pbar.update(1) + return + + # 3. High-Performance Multi-threaded Copy + desc = f"Copying Acc {s_acc[-4:]}" + with tqdm(total=len(objects), desc=desc, unit="obj", leave=False, position=1) as pbar: + with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: + futures = [] + for obj in objects: + target_key = f"{d_path}{obj['Key']}" + futures.append(executor.submit( + copy_worker, dest_s3, s_bucket, obj['Key'], d_bucket, target_key, pbar, args.dry_run + )) + + results = [f.result() for f in concurrent.futures.as_completed(futures)] + + # 4. Reporting & Finalizing + if all(results) and not args.dry_run: + elapsed = time.perf_counter() - start_time + src_s3.put_object(Bucket=s_bucket, Key=MARKER_FILE, Body="migrated") + + report = { + "source": {"account": s_acc, "bucket": s_bucket, "region": s_reg}, + "destination": {"account": d_acc, "bucket": d_bucket, "region": d_reg, "path": d_path}, + "stats": { + "objects": len(objects), + "bytes": total_bytes, + "seconds": round(elapsed, 2), + "mb_per_sec": round((total_bytes/1e6)/elapsed, 2) if elapsed > 0 else 0 + } + } + with open(f"log_{s_acc}_{s_reg}.json", "w") as f: + json.dump(report, f, indent=4) + + elif args.dry_run: + overall_pbar.write(f"DRY RUN: Would have moved {len(objects)} objects ({total_bytes/1e6:.2f} MB) from {s_bucket}") + + overall_pbar.update(1) + +def main(): + args = get_args() + + # 1. Initialize Master Session with Profile and Region + master_session = boto3.Session(profile_name=args.profile, region_name=args.region) + sts_master = master_session.client('sts') + + # 2. Identify Partition + identity = sts_master.get_caller_identity() + partition = identity['Arn'].split(':')[1] + + print(f"--- S3 Org Migrator v{__version__} ---") + print(f"Partition: {partition} | Profile: {args.profile} | Region: {args.region}") + if args.dry_run: print("!!! DRY RUN ENABLED !!!") + + # 3. Process CSV + with open(args.csv, 'r') as f: + rows = list(csv.DictReader(f)) + + with tqdm(total=len(rows), desc="Total Progress", position=0) as overall_pbar: + for row in rows: + process_bucket(row, sts_master, partition, args, overall_pbar) + +if __name__ == "__main__": + main()