diff --git a/local-app/python-tools/cross-organization/archive.py b/local-app/python-tools/cross-organization/archive.py index 30f85763..d2b4396d 100755 --- a/local-app/python-tools/cross-organization/archive.py +++ b/local-app/python-tools/cross-organization/archive.py @@ -13,14 +13,14 @@ from tqdm import tqdm from botocore.exceptions import ClientError -__version__ = "1.0.14" +__version__ = "1.0.15" -# Setup Enhanced Logging -log_filename = f"migration_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" +# Setup Traceable Logging +log_filename = f"migration_trace_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" logging.basicConfig( filename=log_filename, level=logging.INFO, - format='%(asctime)s | %(levelname)s | %(message)s' + format='%(asctime)s | %(levelname)s | [%(threadName)s] %(message)s' ) logger = logging.getLogger(__name__) @@ -30,7 +30,7 @@ def get_args(): parser = argparse.ArgumentParser(description=f"S3 Org Migrator v{__version__}") parser.add_argument("--profile", required=True, help="AWS profile name") - parser.add_argument("--region", required=True, help="Initial STS region") + parser.add_argument("--region", required=True, help="STS/S3 Region (e.g., us-gov-west-1)") parser.add_argument("--role-name", required=True, help="Role to assume") parser.add_argument("--threads", type=int, default=16, help="Concurrent bucket tasks") parser.add_argument("--dry-run", action="store_true", help="Simulate only") @@ -45,16 +45,28 @@ def log_v(msg, verbose=False, level="INFO"): with UI_LOCK: tqdm.write(f"[{level}] {msg}") -def get_client(sts, partition, acc, reg, role_name): +def get_assumed_session(sts_master, partition, acc, reg, role_name, context_label): + """Assumes role and logs explicit identity details for tracing.""" role_arn = f"arn:{partition}:iam::{acc}:role/{role_name}" - res = sts.assume_role(RoleArn=role_arn, RoleSessionName="S3MigratorV14") - c = res['Credentials'] - return boto3.client('s3', - aws_access_key_id=c['AccessKeyId'], - aws_secret_access_key=c['SecretAccessKey'], - aws_session_token=c['SessionToken'], - region_name=reg - ) + try: + response = sts_master.assume_role( + RoleArn=role_arn, + RoleSessionName=f"S3Migrator-v15-{acc}" + ) + creds = response['Credentials'] + assumed_id = response['AssumedRoleUser']['Arn'] + + log_v(f"[{context_label}] ASSUMED IDENTITY: {assumed_id} | REGION: {reg} | PARTITION: {partition}") + + return boto3.Session( + aws_access_key_id=creds['AccessKeyId'], + aws_secret_access_key=creds['SecretAccessKey'], + aws_session_token=creds['SessionToken'], + region_name=reg + ) + except Exception as e: + log_v(f"[{context_label}] FAILED TO ASSUME ROLE {role_arn}: {str(e)}", True, "ERROR") + raise def process_bucket(row, sts_master, partition, args, overall_pbar, slots): slot = slots.acquire() @@ -63,23 +75,34 @@ def process_bucket(row, sts_master, partition, args, overall_pbar, slots): d_path = (row.get('destination_path', '').strip() or f"archive/{s_acc}").rstrip('/') + '/' try: - log_v(f"Task Start: {s_bucket} ({s_acc})", args.verbose) - src_s3 = get_client(sts_master, partition, s_acc, s_reg, args.role_name) - dest_s3 = get_client(sts_master, partition, d_acc, d_reg, args.role_name) - - # Marker Check + log_v(f"--- Processing {s_bucket} in {s_reg} ---", args.verbose) + + # Explicit Sessions for Source and Destination + src_session = get_assumed_session(sts_master, partition, s_acc, s_reg, args.role_name, f"SOURCE:{s_acc}") + dest_session = get_assumed_session(sts_master, partition, d_acc, d_reg, args.role_name, f"DEST:{d_acc}") + + src_s3 = src_session.client('s3') + dest_s3 = dest_session.client('s3') + + # 1. Verification Step: Who am I according to S3? + # This helps debug if the session is using the wrong credentials + whoami = src_session.client('sts').get_caller_identity()['Arn'] + log_v(f"S3 Verification for {s_bucket}: Using Identity {whoami} in Region {src_session.region_name}", args.verbose) + + # 2. Marker Check try: + log_v(f"Checking marker: {s_bucket}/archived...", args.verbose) src_s3.head_object(Bucket=s_bucket, Key="archived") - log_v(f"Skipping {s_bucket}: Already archived.", args.verbose) + log_v(f"Skipping {s_bucket}: Already marked archived.", args.verbose) return except ClientError as e: if e.response['Error']['Code'] == '403': - log_v(f"403 on marker check for {s_bucket}/archived. Likely missing s3:ListBucket.", True, "ERROR") + log_v(f"403 FORBIDDEN on {s_bucket}/archived using identity {whoami}", True, "ERROR") pass - # Inventory - objs = [] + # 3. Inventory and Copy paginator = src_s3.get_paginator('list_objects_v2') + objs = [] for page in paginator.paginate(Bucket=s_bucket): objs.extend(page.get('Contents', [])) @@ -87,19 +110,18 @@ def process_bucket(row, sts_master, partition, args, overall_pbar, slots): for o in objs: try: if not args.dry_run: + # Copy from Source to Dest (using Dest client to initiate) dest_s3.copy({'Bucket': s_bucket, 'Key': o['Key']}, d_bucket, f"{d_path}{o['Key']}") pbar.update(1) except ClientError as e: - if e.response['Error']['Code'] == '403': - log_v(f"403 FORBIDDEN on Key: {o['Key']} in Bucket: {s_bucket}. " - f"Check Object Ownership/ACLs or KMS Decrypt permissions.", True, "ERROR") + log_v(f"FAILED COPY: Key {o['Key']} in {s_bucket} | Identity: {whoami} | Error: {str(e)}", True, "ERROR") raise e if not args.dry_run and objs: src_s3.put_object(Bucket=s_bucket, Key="archived", Body="archived") except Exception as e: - log_v(f"CRITICAL FAIL: {s_bucket} -> {str(e)}", True, "ERROR") + log_v(f"FATAL BUCKET ERROR: {s_bucket} -> {str(e)}", True, "ERROR") finally: overall_pbar.update(1) slots.release(slot) @@ -118,21 +140,27 @@ def release(self, i): def main(): args = get_args() - log_v(f"--- Migration Initiated v{__version__} ---") + log_v(f"--- TRACEABLE MIGRATION START v{__version__} ---") + # Establish Master Session from local profile master_sess = boto3.Session(profile_name=args.profile, region_name=args.region) - sts = master_sess.client('sts') - partition = sts.get_caller_identity()['Arn'].split(':')[1] + sts_master = master_sess.client('sts') + + # Identify Master Partition + id_info = sts_master.get_caller_identity() + partition = id_info['Arn'].split(':')[1] + log_v(f"MASTER IDENTITY: {id_info['Arn']} | PARTITION: {partition} | DEFAULT REGION: {args.region}") with open(args.input, 'r') as f: rows = list(csv.DictReader(f)) slots = SlotManager(args.threads) with tqdm(total=len(rows), desc="OVERALL", position=0) as opbar: with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as ex: - [ex.submit(process_bucket, r, sts, partition, args, opbar, slots) for r in rows] + [ex.submit(process_bucket, r, sts_master, partition, args, opbar, slots) for r in rows] print("\n" * (args.threads + 2)) - log_v(f"--- Migration Finished. Log: {log_filename} ---") + log_v(f"--- Trace log saved to: {log_filename} ---") if __name__ == "__main__": main() +