-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
164 additions
and
0 deletions.
There are no files selected for viewing
164 changes: 164 additions & 0 deletions
164
local-app/python-tools/cross-organization/archive-cloudtrail.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,164 @@ | ||
| #!/bin/env python | ||
|
|
||
| import boto3 | ||
| import csv | ||
| import json | ||
| import time | ||
| import argparse | ||
| import concurrent.futures | ||
| from tqdm import tqdm | ||
| from botocore.exceptions import ClientError | ||
|
|
||
| __version__ = "1.0.5" | ||
|
|
||
| def get_args(): | ||
| parser = argparse.ArgumentParser(description=f"S3 Org Migrator v{__version__}") | ||
| parser.add_argument("--profile", default="default", help="Initial AWS profile to use") | ||
| parser.add_argument("--region", required=True, help="Initial region for the master STS/S3 connection") | ||
| parser.add_argument("--role-name", required=True, help="Role name to assume in member accounts") | ||
| parser.add_argument("--threads", type=int, default=16, help="Max threads for copying (default: 16)") | ||
| parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without copying data") | ||
| parser.add_argument("--csv", default="migration_data.csv", help="Path to input CSV file") | ||
| return parser.parse_args() | ||
|
|
||
| def get_assumed_session(sts_master, partition, account_id, region, role_name): | ||
| """Assume role into member account using the master identity.""" | ||
| try: | ||
| role_arn = f"arn:{partition}:iam::{account_id}:role/{role_name}" | ||
| res = sts_master.assume_role( | ||
| RoleArn=role_arn, | ||
| RoleSessionName="S3MigrationWorker" | ||
| ) | ||
| c = res['Credentials'] | ||
| return boto3.Session( | ||
| aws_access_key_id=c['AccessKeyId'], | ||
| aws_secret_access_key=c['SecretAccessKey'], | ||
| aws_session_token=c['SessionToken'], | ||
| region_name=region | ||
| ) | ||
| except Exception as e: | ||
| return None | ||
|
|
||
| def copy_worker(dest_s3, s_bucket, s_key, d_bucket, d_key, pbar, dry_run): | ||
| """Execution worker for individual object copies.""" | ||
| if dry_run: | ||
| pbar.update(1) | ||
| return True | ||
| try: | ||
| # Server-side copy: S3 handles data transfer directly | ||
| dest_s3.copy({'Bucket': s_bucket, 'Key': s_key}, d_bucket, d_key) | ||
| pbar.update(1) | ||
| return True | ||
| except Exception: | ||
| return False | ||
|
|
||
| def process_bucket(row, sts_master, partition, args, overall_pbar): | ||
| start_time = time.perf_counter() | ||
| s_acc = row['source_account_id'] | ||
| s_bucket = row['source_bucket_name'] | ||
| s_reg = row['source_region'] | ||
| d_acc = row['destination_account_id'] | ||
| d_bucket = row['destination_bucket_name'] | ||
| d_reg = row['destination_region'] | ||
|
|
||
| # Path Logic: Default to archive/source_account_id if blank | ||
| d_path = row.get('destination_path', '').strip() or f"archive/{s_acc}" | ||
| d_path = d_path.rstrip('/') + '/' | ||
|
|
||
| # Establish Sessions | ||
| src_session = get_assumed_session(sts_master, partition, s_acc, s_reg, args.role_name) | ||
| dest_session = get_assumed_session(sts_master, partition, d_acc, d_reg, args.role_name) | ||
|
|
||
| if not src_session or not dest_session: | ||
| overall_pbar.write(f"ERROR: Could not assume roles for Acc {s_acc}") | ||
| overall_pbar.update(1) | ||
| return | ||
|
|
||
| src_s3 = src_session.client('s3') | ||
| dest_s3 = dest_session.client('s3') | ||
|
|
||
| # 1. Marker Check | ||
| MARKER_FILE = "archived" | ||
| try: | ||
| src_s3.head_object(Bucket=s_bucket, Key=MARKER_FILE) | ||
| overall_pbar.write(f"SKIP: {s_bucket} already archived.") | ||
| overall_pbar.update(1) | ||
| return | ||
| except ClientError: | ||
| pass | ||
|
|
||
| # 2. Inventory | ||
| objects = [] | ||
| total_bytes = 0 | ||
| paginator = src_s3.get_paginator('list_objects_v2') | ||
| for page in paginator.paginate(Bucket=s_bucket): | ||
| for obj in page.get('Contents', []): | ||
| if obj['Key'] != MARKER_FILE: | ||
| objects.append(obj) | ||
| total_bytes += obj['Size'] | ||
|
|
||
| if not objects: | ||
| overall_pbar.update(1) | ||
| return | ||
|
|
||
| # 3. High-Performance Multi-threaded Copy | ||
| desc = f"Copying Acc {s_acc[-4:]}" | ||
| with tqdm(total=len(objects), desc=desc, unit="obj", leave=False, position=1) as pbar: | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: | ||
| futures = [] | ||
| for obj in objects: | ||
| target_key = f"{d_path}{obj['Key']}" | ||
| futures.append(executor.submit( | ||
| copy_worker, dest_s3, s_bucket, obj['Key'], d_bucket, target_key, pbar, args.dry_run | ||
| )) | ||
|
|
||
| results = [f.result() for f in concurrent.futures.as_completed(futures)] | ||
|
|
||
| # 4. Reporting & Finalizing | ||
| if all(results) and not args.dry_run: | ||
| elapsed = time.perf_counter() - start_time | ||
| src_s3.put_object(Bucket=s_bucket, Key=MARKER_FILE, Body="migrated") | ||
|
|
||
| report = { | ||
| "source": {"account": s_acc, "bucket": s_bucket, "region": s_reg}, | ||
| "destination": {"account": d_acc, "bucket": d_bucket, "region": d_reg, "path": d_path}, | ||
| "stats": { | ||
| "objects": len(objects), | ||
| "bytes": total_bytes, | ||
| "seconds": round(elapsed, 2), | ||
| "mb_per_sec": round((total_bytes/1e6)/elapsed, 2) if elapsed > 0 else 0 | ||
| } | ||
| } | ||
| with open(f"log_{s_acc}_{s_reg}.json", "w") as f: | ||
| json.dump(report, f, indent=4) | ||
|
|
||
| elif args.dry_run: | ||
| overall_pbar.write(f"DRY RUN: Would have moved {len(objects)} objects ({total_bytes/1e6:.2f} MB) from {s_bucket}") | ||
|
|
||
| overall_pbar.update(1) | ||
|
|
||
| def main(): | ||
| args = get_args() | ||
|
|
||
| # 1. Initialize Master Session with Profile and Region | ||
| master_session = boto3.Session(profile_name=args.profile, region_name=args.region) | ||
| sts_master = master_session.client('sts') | ||
|
|
||
| # 2. Identify Partition | ||
| identity = sts_master.get_caller_identity() | ||
| partition = identity['Arn'].split(':')[1] | ||
|
|
||
| print(f"--- S3 Org Migrator v{__version__} ---") | ||
| print(f"Partition: {partition} | Profile: {args.profile} | Region: {args.region}") | ||
| if args.dry_run: print("!!! DRY RUN ENABLED !!!") | ||
|
|
||
| # 3. Process CSV | ||
| with open(args.csv, 'r') as f: | ||
| rows = list(csv.DictReader(f)) | ||
|
|
||
| with tqdm(total=len(rows), desc="Total Progress", position=0) as overall_pbar: | ||
| for row in rows: | ||
| process_bucket(row, sts_master, partition, args, overall_pbar) | ||
|
|
||
| if __name__ == "__main__": | ||
| main() |