Skip to content

Commit

Permalink
extend logging
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 7, 2026
1 parent e44b786 commit eb637a8
Showing 1 changed file with 36 additions and 49 deletions.
85 changes: 36 additions & 49 deletions local-app/python-tools/cross-organization/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@
from tqdm import tqdm
from botocore.exceptions import ClientError

__version__ = "1.0.13"
__version__ = "1.0.14"

# Setup Logging to File
# Setup Enhanced Logging
log_filename = f"migration_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
filename='archive.log',
filename=log_filename,
level=logging.INFO,
format='%(asctime)s | %(levelname)s | %(message)s'
)
logger = logging.getLogger(__name__)

# UI Synchronization
UI_LOCK = threading.RLock()
tqdm.set_lock(UI_LOCK)

Expand All @@ -36,24 +36,20 @@ def get_args():
parser.add_argument("--dry-run", action="store_true", help="Simulate only")
parser.add_argument("--verbose", action="store_true", help="Print detailed operation steps")
parser.add_argument("--input", default="migration_data.csv", help="Input CSV")
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
parser.add_argument("--output", default=f"results_{ts}.json", help="Summary JSON")
return parser.parse_args()

def log_v(msg, verbose=False):
"""Logs to file and optionally to console if verbose is enabled."""
logger.info(msg)
if verbose:
def log_v(msg, verbose=False, level="INFO"):
if level == "ERROR": logger.error(msg)
else: logger.info(msg)
if verbose or level == "ERROR":
with UI_LOCK:
tqdm.write(f"[VERBOSE] {msg}")
tqdm.write(f"[{level}] {msg}")

def get_client(sts, partition, acc, reg, role_name, service='s3'):
log_v(f"Assuming role in {acc} for {service}...")
def get_client(sts, partition, acc, reg, role_name):
role_arn = f"arn:{partition}:iam::{acc}:role/{role_name}"
res = sts.assume_role(RoleArn=role_arn, RoleSessionName="S3MigrationV13")
res = sts.assume_role(RoleArn=role_arn, RoleSessionName="S3MigratorV14")
c = res['Credentials']
return boto3.client(
service,
return boto3.client('s3',
aws_access_key_id=c['AccessKeyId'],
aws_secret_access_key=c['SecretAccessKey'],
aws_session_token=c['SessionToken'],
Expand All @@ -66,86 +62,77 @@ def process_bucket(row, sts_master, partition, args, overall_pbar, slots):
d_acc, d_reg, d_bucket = row['destination_account_id'], row['destination_region'], row['destination_bucket']
d_path = (row.get('destination_path', '').strip() or f"archive/{s_acc}").rstrip('/') + '/'

entry = {"account": s_acc, "bucket": s_bucket, "status": "failed", "objects": 0}
log_v(f"Starting Task: Source {s_bucket} ({s_acc}) -> Dest {d_bucket} ({d_acc})", args.verbose)

try:
log_v(f"Task Start: {s_bucket} ({s_acc})", args.verbose)
src_s3 = get_client(sts_master, partition, s_acc, s_reg, args.role_name)
dest_s3 = get_client(sts_master, partition, d_acc, d_reg, args.role_name)

# 1. Check for marker
# Marker Check
try:
log_v(f"Checking for marker file 'archived' in {s_bucket}...", args.verbose)
src_s3.head_object(Bucket=s_bucket, Key="archived")
entry["status"] = "skipped"
log_v(f"Bucket {s_bucket} already archived. Skipping.", args.verbose)
return entry
log_v(f"Skipping {s_bucket}: Already archived.", args.verbose)
return
except ClientError as e:
if e.response['Error']['Code'] == '403':
log_v(f"CRITICAL: 403 on HeadObject for {s_bucket}/archived. Check s3:ListBucket + s3:GetObject permissions.", True)
raise e
log_v(f"403 on marker check for {s_bucket}/archived. Likely missing s3:ListBucket.", True, "ERROR")
pass

# 2. Inventory
log_v(f"Listing objects in {s_bucket}...", args.verbose)
# Inventory
objs = []
paginator = src_s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=s_bucket):
objs.extend(page.get('Contents', []))

entry["objects"] = len(objs)
log_v(f"Found {len(objs)} objects in {s_bucket}.", args.verbose)

# 3. Copy with Progress
with tqdm(total=len(objs), desc=f"Acc {s_acc[-4:]}", position=slot, leave=False) as pbar:
for o in objs:
if not args.dry_run:
dest_s3.copy({'Bucket': s_bucket, 'Key': o['Key']}, d_bucket, f"{d_path}{o['Key']}")
pbar.update(1)

if not args.dry_run:
try:
if not args.dry_run:
dest_s3.copy({'Bucket': s_bucket, 'Key': o['Key']}, d_bucket, f"{d_path}{o['Key']}")
pbar.update(1)
except ClientError as e:
if e.response['Error']['Code'] == '403':
log_v(f"403 FORBIDDEN on Key: {o['Key']} in Bucket: {s_bucket}. "
f"Check Object Ownership/ACLs or KMS Decrypt permissions.", True, "ERROR")
raise e

if not args.dry_run and objs:
src_s3.put_object(Bucket=s_bucket, Key="archived", Body="archived")
entry["status"] = "success"

except Exception as e:
log_v(f"TASK FAILED: {s_bucket} error: {str(e)}", True)
entry["status"] = f"error: {str(e)}"
log_v(f"CRITICAL FAIL: {s_bucket} -> {str(e)}", True, "ERROR")
finally:
overall_pbar.update(1)
slots.release(slot)
return entry

class SlotManager:
def __init__(self, n):
self.s = list(range(1, n + 1))
self.l = threading.Lock()
for _ in range(n + 2): print("")
print("\n" * (n + 2))
print(f"\033[{n + 2}A", end="")
def acquire(self):
with self.l: return self.s.pop(0) if self.s else None
def release(self, i):
if i:
with self.l: self.s.append(i); self.s.sort()
with self.l: self.s.insert(0, i); self.s.sort()

def main():
args = get_args()
log_v(f"--- SESSION START: v{__version__} ---")
log_v(f"--- Migration Initiated v{__version__} ---")

master_sess = boto3.Session(profile_name=args.profile, region_name=args.region)
sts = master_sess.client('sts')
partition = sts.get_caller_identity()['Arn'].split(':')[1]

with open(args.input, 'r') as f: rows = list(csv.DictReader(f))
slots = SlotManager(args.threads)
results = []

with tqdm(total=len(rows), desc="OVERALL", position=0) as opbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as ex:
futs = [ex.submit(process_bucket, r, sts, partition, args, opbar, slots) for r in rows]
for f in concurrent.futures.as_completed(futs): results.append(f.result())
[ex.submit(process_bucket, r, sts, partition, args, opbar, slots) for r in rows]

print("\n" * (args.threads + 2))
with open(args.output, 'w') as f: json.dump(results, f, indent=4)
log_v(f"--- SESSION END: {len(results)} rows processed ---")
log_v(f"--- Migration Finished. Log: {log_filename} ---")

if __name__ == "__main__":
main()

0 comments on commit eb637a8

Please sign in to comment.