Skip to content

Commit

Permalink
split buckets and progress bars
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 7, 2026
1 parent c08407b commit bff2c4f
Showing 1 changed file with 102 additions and 115 deletions.
217 changes: 102 additions & 115 deletions local-app/python-tools/cross-organization/archive-cloudtrail.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,175 +6,162 @@
import time
import argparse
import concurrent.futures
import threading
import random
from tqdm import tqdm
from botocore.exceptions import ClientError
from botocore.exceptions import ClientError, BotoCoreError

__version__ = "1.0.5"
__version__ = "1.0.7"

def get_args():
parser = argparse.ArgumentParser(description=f"S3 Org Migrator v{__version__}")
parser.add_argument("--profile", default="default", help="Initial AWS profile to use")
parser.add_argument("--region", required=True, help="Initial region for the master STS connection")
parser.add_argument("--role-name", required=True, help="Role name to assume in member accounts")
parser.add_argument("--threads", type=int, default=16, help="Max threads for copying (default: 16)")
parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without copying data")
parser.add_argument("--input", default="migration_data.csv", help="Path to input CSV file")
parser.add_argument("--profile", default="default", help="Initial AWS profile")
parser.add_argument("--region", required=True, help="Initial region for master connection")
parser.add_argument("--role-name", required=True, help="Role name to assume across accounts")
parser.add_argument("--threads", type=int, default=16, help="Concurrent bucket migrations (default: 16)")
parser.add_argument("--dry-run", action="store_true", help="Dry run mode")
parser.add_argument("--input", default="migration_data.csv", help="Input CSV file")
return parser.parse_args()

def format_size(bytes):
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes < 1024.0: return f"{bytes:.2f} {unit}"
bytes /= 1024.0

class DisplayManager:
"""Manages vertical terminal slots for concurrent tqdm bars."""
def __init__(self, max_slots):
self.slots = list(range(1, max_slots + 1))
self.lock = threading.Lock()

def acquire_slot(self):
with self.lock:
return self.slots.pop(0) if self.slots else None

def release_slot(self, slot):
if slot is not None:
with self.lock:
self.slots.append(slot)
self.slots.sort()

def get_assumed_session(sts_master, partition, account_id, region, role_name):
"""Assume role into member account using the master identity."""
try:
role_arn = f"arn:{partition}:iam::{account_id}:role/{role_name}"
res = sts_master.assume_role(
RoleArn=role_arn,
RoleSessionName="S3MigrationWorker"
)
res = sts_master.assume_role(RoleArn=role_arn, RoleSessionName="S3MigratorV107")
c = res['Credentials']
return boto3.Session(
aws_access_key_id=c['AccessKeyId'],
aws_secret_access_key=c['SecretAccessKey'],
aws_session_token=c['SessionToken'],
region_name=region
)
except Exception as e:
return None

def copy_worker(dest_s3, s_bucket, s_key, d_bucket, d_key, pbar, dry_run):
"""Execution worker for individual object copies."""
if dry_run:
pbar.update(1)
return True
try:
# Server-side copy: Data moves within AWS network
dest_s3.copy({'Bucket': s_bucket, 'Key': s_key}, d_bucket, d_key)
pbar.update(1)
return True
except Exception:
return False

def process_bucket(row, sts_master, partition, args, overall_pbar):
except Exception: return None

def s3_copy_with_backoff(dest_s3, copy_source, d_bucket, d_key, max_retries=5):
"""Executes S3 copy with exponential backoff for 503 SlowDown errors."""
for attempt in range(max_retries):
try:
dest_s3.copy(copy_source, d_bucket, d_key)
return True
except ClientError as e:
code = e.response.get('Error', {}).get('Code')
if code == 'SlowDown' and attempt < max_retries - 1:
# Exponential backoff with jitter
sleep_time = (2 ** attempt) + random.random()
time.sleep(sleep_time)
continue
return False
except (BotoCoreError, Exception):
return False

def process_bucket(row, sts_master, partition, args, overall_pbar, display_manager):
slot = display_manager.acquire_slot()
start_time = time.perf_counter()
s_acc, s_reg, s_bucket = row['source_account_id'], row['source_region'], row['source_bucket']
d_acc, d_reg, d_bucket = row['destination_account_id'], row['destination_region'], row['destination_bucket']

# Mapping based on the new CSV order
s_acc = row['source_account_id']
s_reg = row['source_region']
s_bucket = row['source_bucket']
d_acc = row['destination_account_id']
d_reg = row['destination_region']
d_bucket = row['destination_bucket']

# Path Logic: Default to archive/source_account_id if blank
d_path = row.get('destination_path', '').strip() or f"archive/{s_acc}"
d_path = d_path.rstrip('/') + '/'
d_path = (row.get('destination_path', '').strip() or f"archive/{s_acc}").rstrip('/') + '/'

# Establish Sessions
src_session = get_assumed_session(sts_master, partition, s_acc, s_reg, args.role_name)
dest_session = get_assumed_session(sts_master, partition, d_acc, d_reg, args.role_name)

if not src_session or not dest_session:
overall_pbar.write(f"ERROR: Could not assume roles for Acc {s_acc}")
overall_pbar.update(1)
display_manager.release_slot(slot)
return

src_s3 = src_session.client('s3')
dest_s3 = dest_session.client('s3')

# 1. Marker Check
src_s3, dest_s3 = src_session.client('s3'), dest_session.client('s3')
MARKER_FILE = "archived"

try:
src_s3.head_object(Bucket=s_bucket, Key=MARKER_FILE)
overall_pbar.write(f"SKIP: {s_bucket} in {s_reg} already archived.")
overall_pbar.update(1)
display_manager.release_slot(slot)
return
except ClientError:
pass
except ClientError: pass

# 2. Inventory
objects = []
total_bytes = 0
# Inventory
objects, total_bytes = [], 0
paginator = src_s3.get_paginator('list_objects_v2')
try:
for page in paginator.paginate(Bucket=s_bucket):
for obj in page.get('Contents', []):
if obj['Key'] != MARKER_FILE:
objects.append(obj)
total_bytes += obj['Size']
except Exception as e:
overall_pbar.write(f"ERROR: Failed to list {s_bucket}: {e}")
overall_pbar.update(1)
return
for page in paginator.paginate(Bucket=s_bucket):
for obj in page.get('Contents', []):
if obj['Key'] != MARKER_FILE:
objects.append(obj)
total_bytes += obj['Size']

if not objects:
overall_pbar.update(1)
display_manager.release_slot(slot)
return

# 3. High-Performance Multi-threaded Copy
desc = f"Acc {s_acc[-4:]} ({s_reg})"
with tqdm(total=len(objects), desc=desc, unit="obj", leave=False, position=1) as pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = []
for obj in objects:
target_key = f"{d_path}{obj['Key']}"
futures.append(executor.submit(
copy_worker, dest_s3, s_bucket, obj['Key'], d_bucket, target_key, pbar, args.dry_run
))

results = [f.result() for f in concurrent.futures.as_completed(futures)]

# 4. Finalize & JSON Reporting
# Visual Bar for this specific bucket migration
desc = f"Bucket: {s_bucket[:15]}... ({format_size(total_bytes)})"
with tqdm(total=len(objects), desc=desc, unit="obj", leave=False, position=slot) as pbar:
results = []
for obj in objects:
if not args.dry_run:
res = s3_copy_with_backoff(dest_s3, {'Bucket': s_bucket, 'Key': obj['Key']}, d_bucket, f"{d_path}{obj['Key']}")
results.append(res)
else:
results.append(True)
pbar.update(1)

# Finalize
if all(results) and not args.dry_run:
elapsed = time.perf_counter() - start_time
src_s3.put_object(Bucket=s_bucket, Key=MARKER_FILE, Body="migrated")

src_s3.put_object(Bucket=s_bucket, Key=MARKER_FILE, Body="archived")
report = {
"source": {"account": s_acc, "bucket": s_bucket, "region": s_reg},
"destination": {"account": d_acc, "bucket": d_bucket, "region": d_reg, "path": d_path},
"stats": {
"objects": len(objects),
"bytes": total_bytes,
"seconds": round(elapsed, 2),
"mb_per_sec": round((total_bytes/1e6)/elapsed, 2) if elapsed > 0 else 0
},
"version": __version__
"version": __version__, "source_account": s_acc, "size": format_size(total_bytes),
"objects": len(objects), "seconds": round(elapsed, 2)
}
with open(f"migration_{s_acc}_{s_reg}.json", "w") as f:
json.dump(report, f, indent=4)

elif args.dry_run:
overall_pbar.write(f"DRY RUN: {s_bucket} ({s_reg}) -> {len(objects)} objects found.")

overall_pbar.update(1)
display_manager.release_slot(slot)

def main():
args = get_args()

# 1. Initialize Master Session
print("="*80)
print(f"S3 ORG MIGRATOR v{__version__} | Region: {args.region} | Threads: {args.threads}")
print(f"Role: {args.role_name} | Input: {args.input} | Dry Run: {args.dry_run}")
print("="*80)

master_session = boto3.Session(profile_name=args.profile, region_name=args.region)
sts_master = master_session.client('sts')

# 2. Identify Partition
try:
identity = sts_master.get_caller_identity()
partition = identity['Arn'].split(':')[1]
except Exception as e:
print(f"FATAL: Could not verify master identity: {e}")
return

print(f"--- S3 Org Migrator v{__version__} ---")
print(f"Partition: {partition} | Profile: {args.profile} | Region: {args.region}")
if args.dry_run: print("!!! DRY RUN ENABLED !!!")
partition = sts_master.get_caller_identity()['Arn'].split(':')[1]

# 3. Process Input
try:
with open(args.input, 'r') as f:
rows = list(csv.DictReader(f))
except FileNotFoundError:
print(f"FATAL: Input file '{args.input}' not found.")
return
display_manager = DisplayManager(max_slots=args.threads)

with open(args.input, 'r') as f:
rows = list(csv.DictReader(f))

with tqdm(total=len(rows), desc="OVERALL PROGRESS", position=0) as overall_pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = [executor.submit(process_bucket, row, sts_master, partition, args, overall_pbar, display_manager) for row in rows]
concurrent.futures.wait(futures)

with tqdm(total=len(rows), desc="Overall Progress", position=0) as overall_pbar:
for row in rows:
process_bucket(row, sts_master, partition, args, overall_pbar)
print("\n" * (args.threads + 1) + "Migration Task Complete.")

if __name__ == "__main__":
main()

0 comments on commit bff2c4f

Please sign in to comment.