diff --git a/local-app/python-tools/cross-organization/archive.py b/local-app/python-tools/cross-organization/archive.py index 8153f73d..0d5826ea 100755 --- a/local-app/python-tools/cross-organization/archive.py +++ b/local-app/python-tools/cross-organization/archive.py @@ -7,151 +7,145 @@ import argparse import concurrent.futures import threading +import logging import random from datetime import datetime from tqdm import tqdm from botocore.exceptions import ClientError -__version__ = "1.0.12" +__version__ = "1.0.13" -# Global lock for thread-safe UI updates +# Setup Logging to File +logging.basicConfig( + filename='archive.log', + level=logging.INFO, + format='%(asctime)s | %(levelname)s | %(message)s' +) +logger = logging.getLogger(__name__) + +# UI Synchronization UI_LOCK = threading.RLock() tqdm.set_lock(UI_LOCK) def get_args(): parser = argparse.ArgumentParser(description=f"S3 Org Migrator v{__version__}") - parser.add_argument("--profile", required=True, help="Local AWS profile name (source of credentials)") - parser.add_argument("--region", required=True, help="Initial region for master STS connection") - parser.add_argument("--role-name", required=True, help="Role name to assume in source/dest accounts") - parser.add_argument("--threads", type=int, default=16, help="Concurrent bucket migrations") - parser.add_argument("--dry-run", action="store_true", help="Dry run mode") - parser.add_argument("--input", default="migration_data.csv", help="Input CSV file") + parser.add_argument("--profile", required=True, help="AWS profile name") + parser.add_argument("--region", required=True, help="Initial STS region") + parser.add_argument("--role-name", required=True, help="Role to assume") + parser.add_argument("--threads", type=int, default=16, help="Concurrent bucket tasks") + parser.add_argument("--dry-run", action="store_true", help="Simulate only") + parser.add_argument("--verbose", action="store_true", help="Print detailed operation steps") + parser.add_argument("--input", default="migration_data.csv", help="Input CSV") ts = datetime.now().strftime("%Y%m%d_%H%M%S") - parser.add_argument("--output", default=f"archive_results_{ts}.json", help="Summary output file") + parser.add_argument("--output", default=f"results_{ts}.json", help="Summary JSON") return parser.parse_args() -def format_size(bytes): - for unit in ['B', 'KB', 'MB', 'GB', 'TB']: - if bytes < 1024.0: return f"{bytes:.2f} {unit}" - bytes /= 1024.0 - -class DisplayManager: - def __init__(self, max_slots): - self.slots = list(range(1, max_slots + 1)) - self.lock = threading.Lock() - for _ in range(max_slots + 2): print("") - print(f"\033[{max_slots + 2}A", end="") - - def acquire_slot(self): - with self.lock: return self.slots.pop(0) if self.slots else None - - def release_slot(self, slot): - if slot is not None: - with self.lock: - self.slots.append(slot) - self.slots.sort() - -def get_assumed_client(sts_client, partition, account_id, region, role_name, service='s3'): - """Assume role using the master STS client and return a service client.""" - role_arn = f"arn:{partition}:iam::{account_id}:role/{role_name}" - try: - response = sts_client.assume_role( - RoleArn=role_arn, - RoleSessionName=f"MigrationSession-{account_id}" - ) - creds = response['Credentials'] - return boto3.client( - service, - aws_access_key_id=creds['AccessKeyId'], - aws_secret_access_key=creds['SecretAccessKey'], - aws_session_token=creds['SessionToken'], - region_name=region - ) - except Exception as e: - raise Exception(f"AssumeRole failed for {account_id}: {str(e)}") - -def process_bucket(row, sts_master, partition, args, overall_pbar, display_manager): - slot = display_manager.acquire_slot() - start_time = time.perf_counter() +def log_v(msg, verbose=False): + """Logs to file and optionally to console if verbose is enabled.""" + logger.info(msg) + if verbose: + with UI_LOCK: + tqdm.write(f"[VERBOSE] {msg}") + +def get_client(sts, partition, acc, reg, role_name, service='s3'): + log_v(f"Assuming role in {acc} for {service}...") + role_arn = f"arn:{partition}:iam::{acc}:role/{role_name}" + res = sts.assume_role(RoleArn=role_arn, RoleSessionName="S3MigrationV13") + c = res['Credentials'] + return boto3.client( + service, + aws_access_key_id=c['AccessKeyId'], + aws_secret_access_key=c['SecretAccessKey'], + aws_session_token=c['SessionToken'], + region_name=reg + ) + +def process_bucket(row, sts_master, partition, args, overall_pbar, slots): + slot = slots.acquire() s_acc, s_reg, s_bucket = row['source_account_id'], row['source_region'], row['source_bucket'] d_acc, d_reg, d_bucket = row['destination_account_id'], row['destination_region'], row['destination_bucket'] d_path = (row.get('destination_path', '').strip() or f"archive/{s_acc}").rstrip('/') + '/' - - entry = {"account": s_acc, "bucket": s_bucket, "objects": 0, "bytes": 0, "errors": 0, "retries": 0, "status": "failed"} + + entry = {"account": s_acc, "bucket": s_bucket, "status": "failed", "objects": 0} + log_v(f"Starting Task: Source {s_bucket} ({s_acc}) -> Dest {d_bucket} ({d_acc})", args.verbose) try: - # Establish clients using assumed roles - src_s3 = get_assumed_client(sts_master, partition, s_acc, s_reg, args.role_name) - dest_s3 = get_assumed_client(sts_master, partition, d_acc, d_reg, args.role_name) - - # 1. Inventory (Objects & Size) + src_s3 = get_client(sts_master, partition, s_acc, s_reg, args.role_name) + dest_s3 = get_client(sts_master, partition, d_acc, d_reg, args.role_name) + + # 1. Check for marker + try: + log_v(f"Checking for marker file 'archived' in {s_bucket}...", args.verbose) + src_s3.head_object(Bucket=s_bucket, Key="archived") + entry["status"] = "skipped" + log_v(f"Bucket {s_bucket} already archived. Skipping.", args.verbose) + return entry + except ClientError as e: + if e.response['Error']['Code'] == '403': + log_v(f"CRITICAL: 403 on HeadObject for {s_bucket}/archived. Check s3:ListBucket + s3:GetObject permissions.", True) + raise e + + # 2. Inventory + log_v(f"Listing objects in {s_bucket}...", args.verbose) objs = [] paginator = src_s3.get_paginator('list_objects_v2') for page in paginator.paginate(Bucket=s_bucket): - for obj in page.get('Contents', []): - if obj['Key'] != "archived": - objs.append(obj) - entry["bytes"] += obj['Size'] - + objs.extend(page.get('Contents', [])) + entry["objects"] = len(objs) - if not objs: - entry["status"] = "empty" - else: - desc = f"Acc {s_acc[-4:]} | {s_bucket[:12]}" - with tqdm(total=len(objs), desc=desc, position=slot, leave=False) as pbar: - for obj in objs: - if not args.dry_run: - # Server-side copy - dest_s3.copy({'Bucket': s_bucket, 'Key': obj['Key']}, d_bucket, f"{d_path}{obj['Key']}") - pbar.update(1) - - if not args.dry_run: - src_s3.put_object(Bucket=s_bucket, Key="archived", Body="archived") - entry["status"] = "success" - else: - entry["status"] = "dry-run" + log_v(f"Found {len(objs)} objects in {s_bucket}.", args.verbose) + + # 3. Copy with Progress + with tqdm(total=len(objs), desc=f"Acc {s_acc[-4:]}", position=slot, leave=False) as pbar: + for o in objs: + if not args.dry_run: + dest_s3.copy({'Bucket': s_bucket, 'Key': o['Key']}, d_bucket, f"{d_path}{o['Key']}") + pbar.update(1) + + if not args.dry_run: + src_s3.put_object(Bucket=s_bucket, Key="archived", Body="archived") + entry["status"] = "success" except Exception as e: - entry["status"] = f"failed: {str(e)}" + log_v(f"TASK FAILED: {s_bucket} error: {str(e)}", True) + entry["status"] = f"error: {str(e)}" finally: - entry["elapsed"] = round(time.perf_counter() - start_time, 2) overall_pbar.update(1) - display_manager.release_slot(slot) + slots.release(slot) return entry +class SlotManager: + def __init__(self, n): + self.s = list(range(1, n + 1)) + self.l = threading.Lock() + for _ in range(n + 2): print("") + print(f"\033[{n + 2}A", end="") + def acquire(self): + with self.l: return self.s.pop(0) if self.s else None + def release(self, i): + if i: + with self.l: self.s.append(i); self.s.sort() + def main(): args = get_args() - print("="*80) - print(f"S3 ORG MIGRATOR v{__version__} | Active Profile: {args.profile}") - print(f"Threads: {args.threads} | Dry Run: {args.dry_run}") - print("="*80) - - # 1. Setup Master Session from Profile - master_session = boto3.Session(profile_name=args.profile, region_name=args.region) - sts_master = master_session.client('sts') - identity = sts_master.get_caller_identity() - partition = identity['Arn'].split(':')[1] - - with open(args.input, 'r') as f: - rows = list(csv.DictReader(f)) - - display_manager = DisplayManager(max_slots=args.threads) - final_results = [] - - with tqdm(total=len(rows), desc="OVERALL PROGRESS", position=0) as overall_pbar: - with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: - futures = [executor.submit(process_bucket, row, sts_master, partition, args, overall_pbar, display_manager) for row in rows] - for fut in concurrent.futures.as_completed(futures): - final_results.append(fut.result()) - - # Final Cleanup and Report - print("\n" * (args.threads + 2)) - with open(args.output, "w") as f: json.dump(final_results, f, indent=4) + log_v(f"--- SESSION START: v{__version__} ---") - total_objs = sum(r.get('objects', 0) for r in final_results) - total_bytes = sum(r.get('bytes', 0) for r in final_results) - print(f"SUMMARY: {len(final_results)} buckets processed. {total_objs} objects ({format_size(total_bytes)}) moved.") - print(f"Results: {args.output}") + master_sess = boto3.Session(profile_name=args.profile, region_name=args.region) + sts = master_sess.client('sts') + partition = sts.get_caller_identity()['Arn'].split(':')[1] + + with open(args.input, 'r') as f: rows = list(csv.DictReader(f)) + slots = SlotManager(args.threads) + results = [] + + with tqdm(total=len(rows), desc="OVERALL", position=0) as opbar: + with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as ex: + futs = [ex.submit(process_bucket, r, sts, partition, args, opbar, slots) for r in rows] + for f in concurrent.futures.as_completed(futs): results.append(f.result()) + + print("\n" * (args.threads + 2)) + with open(args.output, 'w') as f: json.dump(results, f, indent=4) + log_v(f"--- SESSION END: {len(results)} rows processed ---") if __name__ == "__main__": main()