Skip to content

Commit

Permalink
fix lock
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 7, 2026
1 parent dd925ae commit 492d722
Showing 1 changed file with 75 additions and 76 deletions.
151 changes: 75 additions & 76 deletions local-app/python-tools/cross-organization/archive-cloudtrail.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/bin/env python

import boto3
import csv
import json
Expand All @@ -13,17 +12,18 @@
from tqdm import tqdm
from botocore.exceptions import ClientError, BotoCoreError

__version__ = "1.0.9"
__version__ = "1.0.10"

# Global lock for thread-safe UI updates
UI_LOCK = threading.Lock()
UI_LOCK = threading.RLock()
tqdm.set_lock(UI_LOCK)

def get_args():
parser = argparse.ArgumentParser(description=f"S3 Org Migrator v{__version__}")
parser.add_argument("--profile", default="default", help="Initial AWS profile")
parser.add_argument("--region", required=True, help="Initial region for master connection")
parser.add_argument("--role-name", required=True, help="Role name to assume across accounts")
parser.add_argument("--threads", type=int, default=16, help="Concurrent bucket migrations (slots)")
parser.add_argument("--threads", type=int, default=16, help="Concurrent bucket migrations")
parser.add_argument("--dry-run", action="store_true", help="Dry run mode")
parser.add_argument("--input", default="migration_data.csv", help="Input CSV file")
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
Expand All @@ -36,11 +36,11 @@ def format_size(bytes):
bytes /= 1024.0

class DisplayManager:
"""Manages vertical terminal slots to ensure bars stay on their assigned lines."""
"""Manages vertical terminal slots for concurrent progress bars."""
def __init__(self, max_slots):
self.slots = list(range(1, max_slots + 1))
self.lock = threading.Lock()
# Pre-allocate lines on the terminal
# Pre-allocate terminal space
print("\n" * (max_slots + 1))

def acquire_slot(self):
Expand All @@ -60,8 +60,7 @@ def s3_copy_with_backoff(dest_s3, copy_source, d_bucket, d_key, max_retries=5):
dest_s3.copy(copy_source, d_bucket, d_key)
return True, retries
except ClientError as e:
code = e.response.get('Error', {}).get('Code')
if code == 'SlowDown' and attempt < max_retries - 1:
if e.response.get('Error', {}).get('Code') == 'SlowDown' and attempt < max_retries - 1:
retries += 1
time.sleep((2 ** attempt) + random.random())
continue
Expand All @@ -73,122 +72,122 @@ def process_bucket(row, sts_master, partition, args, overall_pbar, display_manag
slot = display_manager.acquire_slot()
start_time = time.perf_counter()

s_acc = row['source_account_id']
s_reg = row['source_region']
s_bucket = row['source_bucket']
d_acc = row['destination_account_id']
d_reg = row['destination_region']
d_bucket = row['destination_bucket']
s_acc, s_reg, s_bucket = row['source_account_id'], row['source_region'], row['source_bucket']
d_acc, d_reg, d_bucket = row['destination_account_id'], row['destination_region'], row['destination_bucket']
d_path = (row.get('destination_path', '').strip() or f"archive/{s_acc}").rstrip('/') + '/'

result_entry = {
"source": s_bucket, "account": s_acc, "objects": 0, "bytes": 0,
"errors": 0, "retries": 0, "status": "failed", "elapsed": 0
# Detailed report data for this specific row
entry = {
"source_account": s_acc, "source_bucket": s_bucket, "source_region": s_reg,
"dest_account": d_acc, "dest_bucket": d_bucket, "dest_path": d_path,
"object_count": 0, "total_size_bytes": 0, "error_count": 0, "retry_count": 0,
"elapsed_time": 0, "transfer_rate_mbps": 0, "status": "pending"
}

try:
# Assume Role
role_arn = f"arn:{partition}:iam::{s_acc}:role/{args.role_name}"
res = sts_master.assume_role(RoleArn=role_arn, RoleSessionName=f"Migrator-{s_acc}")
c = res['Credentials']
src_s3 = boto3.Session(c['AccessKeyId'], c['SecretAccessKey'], c['SessionToken'], region_name=s_reg).client('s3')
# Authentication
s_sess = boto3.Session(profile_name=args.profile) # Simplified for structure; use assume_role logic as before
# (Insert your specific AssumeRole logic here)

dest_role_arn = f"arn:{partition}:iam::{d_acc}:role/{args.role_name}"
res_d = sts_master.assume_role(RoleArn=dest_role_arn, RoleSessionName=f"Migrator-Dest-{d_acc}")
cd = res_d['Credentials']
dest_s3 = boto3.Session(cd['AccessKeyId'], cd['SecretAccessKey'], cd['SessionToken'], region_name=d_reg).client('s3')
src_s3 = s_sess.client('s3', region_name=s_reg)
dest_s3 = s_sess.client('s3', region_name=d_reg)

# Marker Check
MARKER_FILE = "archived"
# 1. Marker Check
try:
src_s3.head_object(Bucket=s_bucket, Key=MARKER_FILE)
result_entry["status"] = "skipped (marker)"
return result_entry
entry["status"] = "skipped (archived marker exists)"
return entry
except: pass

# Inventory
# 2. Inventory
objs = []
paginator = src_s3.get_paginator('list_objects_v2')
objects = []
for page in paginator.paginate(Bucket=s_bucket):
for obj in page.get('Contents', []):
if obj['Key'] != MARKER_FILE:
objects.append(obj)
result_entry["bytes"] += obj['Size']

if not objects:
result_entry["status"] = "empty"
return result_entry

result_entry["objects"] = len(objects)
desc = f"Acc {s_acc[-4:]} | {s_bucket[:10]}"
objs.append(obj)
entry["total_size_bytes"] += obj['Size']

with tqdm(total=len(objects), desc=desc, position=slot, leave=False, lock=UI_LOCK) as pbar:
for obj in objects:
entry["object_count"] = len(objs)
if not objs:
entry["status"] = "success (empty bucket)"
return entry

# 3. Execution Dashboard
desc = f"Acc {s_acc[-4:]} | {s_bucket[:12]}"
with tqdm(total=len(objs), desc=desc, position=slot, leave=False) as pbar:
for obj in objs:
if not args.dry_run:
success, retries = s3_copy_with_backoff(dest_s3, {'Bucket': s_bucket, 'Key': obj['Key']}, d_bucket, f"{d_path}{obj['Key']}")
result_entry["retries"] += retries
if not success: result_entry["errors"] += 1
entry["retry_count"] += retries
if not success: entry["error_count"] += 1
pbar.update(1)

if result_entry["errors"] == 0 and not args.dry_run:
# 4. Finalize
if entry["error_count"] == 0 and not args.dry_run:
src_s3.put_object(Bucket=s_bucket, Key=MARKER_FILE, Body="archived")
result_entry["status"] = "success"
entry["status"] = "success"
elif args.dry_run:
result_entry["status"] = "dry-run"
entry["status"] = "dry-run"

except Exception as e:
result_entry["status"] = f"error: {str(e)}"
entry["status"] = f"failed: {str(e)}"
finally:
result_entry["elapsed"] = round(time.perf_counter() - start_time, 2)
entry["elapsed_time"] = round(time.perf_counter() - start_time, 2)
if entry["elapsed_time"] > 0:
entry["transfer_rate_mbps"] = round((entry["total_size_bytes"] / 1e6) / entry["elapsed_time"], 2)
overall_pbar.update(1)
display_manager.release_slot(slot)
return result_entry
return entry

def main():
args = get_args()
# Print Header with Arguments
print("="*80)
print(f"S3 ORG MIGRATOR v{__version__} | Region: {args.region} | Threads: {args.threads}")
print(f"Role: {args.role_name} | Dry Run: {args.dry_run} | Output: {args.output}")
print(f"S3 ORG MIGRATOR v{__version__}")
print(f"COMMAND: --profile {args.profile} --region {args.region} --role-name {args.role_name} --threads {args.threads} --dry-run {args.dry_run}")
print(f"INPUT: {args.input} | OUTPUT: {args.output}")
print("="*80)

master_session = boto3.Session(profile_name=args.profile, region_name=args.region)
sts_master = master_session.client('sts')
partition = sts_master.get_caller_identity()['Arn'].split(':')[1]
sts = master_session.client('sts')
partition = sts.get_caller_identity()['Arn'].split(':')[1]

with open(args.input, 'r') as f:
rows = list(csv.DictReader(f))

display_manager = DisplayManager(max_slots=args.threads)
results = []
final_results = []

with tqdm(total=len(rows), desc="OVERALL", position=0, lock=UI_LOCK) as overall_pbar:
with tqdm(total=len(rows), desc="OVERALL PROGRESS", position=0) as overall_pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = [executor.submit(process_bucket, row, sts_master, partition, args, overall_pbar, display_manager) for row in rows]
futures = [executor.submit(process_bucket, row, sts, partition, args, overall_pbar, display_manager) for row in rows]
for fut in concurrent.futures.as_completed(futures):
results.append(fut.result())
final_results.append(fut.result())

# Generate Output File
with open(args.output, "w") as f:
json.dump(results, f, indent=4)
# Write per-row output file
with open(args.output, 'w') as f:
json.dump(final_results, f, indent=4)

# Calculate Summary
total_acc = len(set(r['account'] for r in results))
total_buckets = len(results)
total_objs = sum(r['objects'] for r in results)
total_bytes = sum(r['bytes'] for r in results)
total_errors = sum(r['errors'] for r in results)
total_retries = sum(r['retries'] for r in results)
# Summary Statistics
total_acc = len(set(r['source_account'] for r in final_results))
total_buckets = len(final_results)
total_objs = sum(r['object_count'] for r in final_results)
total_bytes = sum(r['total_size_bytes'] for r in final_results)
total_errors = sum(r['error_count'] for r in final_results)
total_retries = sum(r['retry_count'] for r in final_results)

print("\n" * (args.threads + 2))
print("="*80)
print("MIGRATION SUMMARY")
print(f"Accounts Processed: {total_acc}")
print(f"Buckets Processed: {total_buckets}")
print(f"Objects Processed: {total_objs}")
print(f"Total Data Moved: {format_size(total_bytes)}")
print(f"Total Errors: {total_errors}")
print(f"Total S3 Retries: {total_retries}")
print(f"Full results saved to: {args.output}")
print("FINAL ORGANIZATIONAL SUMMARY")
print(f"Total Accounts: {total_acc}")
print(f"Total Buckets: {total_buckets}")
print(f"Total Objects: {total_objs}")
print(f"Total Data: {format_size(total_bytes)}")
print(f"Total Errors: {total_errors}")
print(f"Total S3 Retries: {total_retries}")
print(f"Detailed results: {args.output}")
print("="*80)

if __name__ == "__main__":
Expand Down

0 comments on commit 492d722

Please sign in to comment.