Skip to content

Commit

Permalink
select appropriate region for sts
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 7, 2026
1 parent eb637a8 commit 4b94e9b
Showing 1 changed file with 60 additions and 32 deletions.
92 changes: 60 additions & 32 deletions local-app/python-tools/cross-organization/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
from tqdm import tqdm
from botocore.exceptions import ClientError

__version__ = "1.0.14"
__version__ = "1.0.15"

# Setup Enhanced Logging
log_filename = f"migration_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
# Setup Traceable Logging
log_filename = f"migration_trace_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
filename=log_filename,
level=logging.INFO,
format='%(asctime)s | %(levelname)s | %(message)s'
format='%(asctime)s | %(levelname)s | [%(threadName)s] %(message)s'
)
logger = logging.getLogger(__name__)

Expand All @@ -30,7 +30,7 @@
def get_args():
parser = argparse.ArgumentParser(description=f"S3 Org Migrator v{__version__}")
parser.add_argument("--profile", required=True, help="AWS profile name")
parser.add_argument("--region", required=True, help="Initial STS region")
parser.add_argument("--region", required=True, help="STS/S3 Region (e.g., us-gov-west-1)")
parser.add_argument("--role-name", required=True, help="Role to assume")
parser.add_argument("--threads", type=int, default=16, help="Concurrent bucket tasks")
parser.add_argument("--dry-run", action="store_true", help="Simulate only")
Expand All @@ -45,16 +45,28 @@ def log_v(msg, verbose=False, level="INFO"):
with UI_LOCK:
tqdm.write(f"[{level}] {msg}")

def get_client(sts, partition, acc, reg, role_name):
def get_assumed_session(sts_master, partition, acc, reg, role_name, context_label):
"""Assumes role and logs explicit identity details for tracing."""
role_arn = f"arn:{partition}:iam::{acc}:role/{role_name}"
res = sts.assume_role(RoleArn=role_arn, RoleSessionName="S3MigratorV14")
c = res['Credentials']
return boto3.client('s3',
aws_access_key_id=c['AccessKeyId'],
aws_secret_access_key=c['SecretAccessKey'],
aws_session_token=c['SessionToken'],
region_name=reg
)
try:
response = sts_master.assume_role(
RoleArn=role_arn,
RoleSessionName=f"S3Migrator-v15-{acc}"
)
creds = response['Credentials']
assumed_id = response['AssumedRoleUser']['Arn']

log_v(f"[{context_label}] ASSUMED IDENTITY: {assumed_id} | REGION: {reg} | PARTITION: {partition}")

return boto3.Session(
aws_access_key_id=creds['AccessKeyId'],
aws_secret_access_key=creds['SecretAccessKey'],
aws_session_token=creds['SessionToken'],
region_name=reg
)
except Exception as e:
log_v(f"[{context_label}] FAILED TO ASSUME ROLE {role_arn}: {str(e)}", True, "ERROR")
raise

def process_bucket(row, sts_master, partition, args, overall_pbar, slots):
slot = slots.acquire()
Expand All @@ -63,43 +75,53 @@ def process_bucket(row, sts_master, partition, args, overall_pbar, slots):
d_path = (row.get('destination_path', '').strip() or f"archive/{s_acc}").rstrip('/') + '/'

try:
log_v(f"Task Start: {s_bucket} ({s_acc})", args.verbose)
src_s3 = get_client(sts_master, partition, s_acc, s_reg, args.role_name)
dest_s3 = get_client(sts_master, partition, d_acc, d_reg, args.role_name)

# Marker Check
log_v(f"--- Processing {s_bucket} in {s_reg} ---", args.verbose)

# Explicit Sessions for Source and Destination
src_session = get_assumed_session(sts_master, partition, s_acc, s_reg, args.role_name, f"SOURCE:{s_acc}")
dest_session = get_assumed_session(sts_master, partition, d_acc, d_reg, args.role_name, f"DEST:{d_acc}")

src_s3 = src_session.client('s3')
dest_s3 = dest_session.client('s3')

# 1. Verification Step: Who am I according to S3?
# This helps debug if the session is using the wrong credentials
whoami = src_session.client('sts').get_caller_identity()['Arn']
log_v(f"S3 Verification for {s_bucket}: Using Identity {whoami} in Region {src_session.region_name}", args.verbose)

# 2. Marker Check
try:
log_v(f"Checking marker: {s_bucket}/archived...", args.verbose)
src_s3.head_object(Bucket=s_bucket, Key="archived")
log_v(f"Skipping {s_bucket}: Already archived.", args.verbose)
log_v(f"Skipping {s_bucket}: Already marked archived.", args.verbose)
return
except ClientError as e:
if e.response['Error']['Code'] == '403':
log_v(f"403 on marker check for {s_bucket}/archived. Likely missing s3:ListBucket.", True, "ERROR")
log_v(f"403 FORBIDDEN on {s_bucket}/archived using identity {whoami}", True, "ERROR")
pass

# Inventory
objs = []
# 3. Inventory and Copy
paginator = src_s3.get_paginator('list_objects_v2')
objs = []
for page in paginator.paginate(Bucket=s_bucket):
objs.extend(page.get('Contents', []))

with tqdm(total=len(objs), desc=f"Acc {s_acc[-4:]}", position=slot, leave=False) as pbar:
for o in objs:
try:
if not args.dry_run:
# Copy from Source to Dest (using Dest client to initiate)
dest_s3.copy({'Bucket': s_bucket, 'Key': o['Key']}, d_bucket, f"{d_path}{o['Key']}")
pbar.update(1)
except ClientError as e:
if e.response['Error']['Code'] == '403':
log_v(f"403 FORBIDDEN on Key: {o['Key']} in Bucket: {s_bucket}. "
f"Check Object Ownership/ACLs or KMS Decrypt permissions.", True, "ERROR")
log_v(f"FAILED COPY: Key {o['Key']} in {s_bucket} | Identity: {whoami} | Error: {str(e)}", True, "ERROR")
raise e

if not args.dry_run and objs:
src_s3.put_object(Bucket=s_bucket, Key="archived", Body="archived")

except Exception as e:
log_v(f"CRITICAL FAIL: {s_bucket} -> {str(e)}", True, "ERROR")
log_v(f"FATAL BUCKET ERROR: {s_bucket} -> {str(e)}", True, "ERROR")
finally:
overall_pbar.update(1)
slots.release(slot)
Expand All @@ -118,21 +140,27 @@ def release(self, i):

def main():
args = get_args()
log_v(f"--- Migration Initiated v{__version__} ---")
log_v(f"--- TRACEABLE MIGRATION START v{__version__} ---")

# Establish Master Session from local profile
master_sess = boto3.Session(profile_name=args.profile, region_name=args.region)
sts = master_sess.client('sts')
partition = sts.get_caller_identity()['Arn'].split(':')[1]
sts_master = master_sess.client('sts')

# Identify Master Partition
id_info = sts_master.get_caller_identity()
partition = id_info['Arn'].split(':')[1]
log_v(f"MASTER IDENTITY: {id_info['Arn']} | PARTITION: {partition} | DEFAULT REGION: {args.region}")

with open(args.input, 'r') as f: rows = list(csv.DictReader(f))
slots = SlotManager(args.threads)

with tqdm(total=len(rows), desc="OVERALL", position=0) as opbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as ex:
[ex.submit(process_bucket, r, sts, partition, args, opbar, slots) for r in rows]
[ex.submit(process_bucket, r, sts_master, partition, args, opbar, slots) for r in rows]

print("\n" * (args.threads + 2))
log_v(f"--- Migration Finished. Log: {log_filename} ---")
log_v(f"--- Trace log saved to: {log_filename} ---")

if __name__ == "__main__":
main()

0 comments on commit 4b94e9b

Please sign in to comment.