diff --git a/local-app/python-tools/cross-organization/archive.py b/local-app/python-tools/cross-organization/archive.py index 0d5826ea..30f85763 100755 --- a/local-app/python-tools/cross-organization/archive.py +++ b/local-app/python-tools/cross-organization/archive.py @@ -13,17 +13,17 @@ from tqdm import tqdm from botocore.exceptions import ClientError -__version__ = "1.0.13" +__version__ = "1.0.14" -# Setup Logging to File +# Setup Enhanced Logging +log_filename = f"migration_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" logging.basicConfig( - filename='archive.log', + filename=log_filename, level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s' ) logger = logging.getLogger(__name__) -# UI Synchronization UI_LOCK = threading.RLock() tqdm.set_lock(UI_LOCK) @@ -36,24 +36,20 @@ def get_args(): parser.add_argument("--dry-run", action="store_true", help="Simulate only") parser.add_argument("--verbose", action="store_true", help="Print detailed operation steps") parser.add_argument("--input", default="migration_data.csv", help="Input CSV") - ts = datetime.now().strftime("%Y%m%d_%H%M%S") - parser.add_argument("--output", default=f"results_{ts}.json", help="Summary JSON") return parser.parse_args() -def log_v(msg, verbose=False): - """Logs to file and optionally to console if verbose is enabled.""" - logger.info(msg) - if verbose: +def log_v(msg, verbose=False, level="INFO"): + if level == "ERROR": logger.error(msg) + else: logger.info(msg) + if verbose or level == "ERROR": with UI_LOCK: - tqdm.write(f"[VERBOSE] {msg}") + tqdm.write(f"[{level}] {msg}") -def get_client(sts, partition, acc, reg, role_name, service='s3'): - log_v(f"Assuming role in {acc} for {service}...") +def get_client(sts, partition, acc, reg, role_name): role_arn = f"arn:{partition}:iam::{acc}:role/{role_name}" - res = sts.assume_role(RoleArn=role_arn, RoleSessionName="S3MigrationV13") + res = sts.assume_role(RoleArn=role_arn, RoleSessionName="S3MigratorV14") c = res['Credentials'] - return boto3.client( - service, + return boto3.client('s3', aws_access_key_id=c['AccessKeyId'], aws_secret_access_key=c['SecretAccessKey'], aws_session_token=c['SessionToken'], @@ -66,69 +62,63 @@ def process_bucket(row, sts_master, partition, args, overall_pbar, slots): d_acc, d_reg, d_bucket = row['destination_account_id'], row['destination_region'], row['destination_bucket'] d_path = (row.get('destination_path', '').strip() or f"archive/{s_acc}").rstrip('/') + '/' - entry = {"account": s_acc, "bucket": s_bucket, "status": "failed", "objects": 0} - log_v(f"Starting Task: Source {s_bucket} ({s_acc}) -> Dest {d_bucket} ({d_acc})", args.verbose) - try: + log_v(f"Task Start: {s_bucket} ({s_acc})", args.verbose) src_s3 = get_client(sts_master, partition, s_acc, s_reg, args.role_name) dest_s3 = get_client(sts_master, partition, d_acc, d_reg, args.role_name) - # 1. Check for marker + # Marker Check try: - log_v(f"Checking for marker file 'archived' in {s_bucket}...", args.verbose) src_s3.head_object(Bucket=s_bucket, Key="archived") - entry["status"] = "skipped" - log_v(f"Bucket {s_bucket} already archived. Skipping.", args.verbose) - return entry + log_v(f"Skipping {s_bucket}: Already archived.", args.verbose) + return except ClientError as e: if e.response['Error']['Code'] == '403': - log_v(f"CRITICAL: 403 on HeadObject for {s_bucket}/archived. Check s3:ListBucket + s3:GetObject permissions.", True) - raise e + log_v(f"403 on marker check for {s_bucket}/archived. Likely missing s3:ListBucket.", True, "ERROR") + pass - # 2. Inventory - log_v(f"Listing objects in {s_bucket}...", args.verbose) + # Inventory objs = [] paginator = src_s3.get_paginator('list_objects_v2') for page in paginator.paginate(Bucket=s_bucket): objs.extend(page.get('Contents', [])) - - entry["objects"] = len(objs) - log_v(f"Found {len(objs)} objects in {s_bucket}.", args.verbose) - # 3. Copy with Progress with tqdm(total=len(objs), desc=f"Acc {s_acc[-4:]}", position=slot, leave=False) as pbar: for o in objs: - if not args.dry_run: - dest_s3.copy({'Bucket': s_bucket, 'Key': o['Key']}, d_bucket, f"{d_path}{o['Key']}") - pbar.update(1) - - if not args.dry_run: + try: + if not args.dry_run: + dest_s3.copy({'Bucket': s_bucket, 'Key': o['Key']}, d_bucket, f"{d_path}{o['Key']}") + pbar.update(1) + except ClientError as e: + if e.response['Error']['Code'] == '403': + log_v(f"403 FORBIDDEN on Key: {o['Key']} in Bucket: {s_bucket}. " + f"Check Object Ownership/ACLs or KMS Decrypt permissions.", True, "ERROR") + raise e + + if not args.dry_run and objs: src_s3.put_object(Bucket=s_bucket, Key="archived", Body="archived") - entry["status"] = "success" except Exception as e: - log_v(f"TASK FAILED: {s_bucket} error: {str(e)}", True) - entry["status"] = f"error: {str(e)}" + log_v(f"CRITICAL FAIL: {s_bucket} -> {str(e)}", True, "ERROR") finally: overall_pbar.update(1) slots.release(slot) - return entry class SlotManager: def __init__(self, n): self.s = list(range(1, n + 1)) self.l = threading.Lock() - for _ in range(n + 2): print("") + print("\n" * (n + 2)) print(f"\033[{n + 2}A", end="") def acquire(self): with self.l: return self.s.pop(0) if self.s else None def release(self, i): if i: - with self.l: self.s.append(i); self.s.sort() + with self.l: self.s.insert(0, i); self.s.sort() def main(): args = get_args() - log_v(f"--- SESSION START: v{__version__} ---") + log_v(f"--- Migration Initiated v{__version__} ---") master_sess = boto3.Session(profile_name=args.profile, region_name=args.region) sts = master_sess.client('sts') @@ -136,16 +126,13 @@ def main(): with open(args.input, 'r') as f: rows = list(csv.DictReader(f)) slots = SlotManager(args.threads) - results = [] with tqdm(total=len(rows), desc="OVERALL", position=0) as opbar: with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as ex: - futs = [ex.submit(process_bucket, r, sts, partition, args, opbar, slots) for r in rows] - for f in concurrent.futures.as_completed(futs): results.append(f.result()) + [ex.submit(process_bucket, r, sts, partition, args, opbar, slots) for r in rows] print("\n" * (args.threads + 2)) - with open(args.output, 'w') as f: json.dump(results, f, indent=4) - log_v(f"--- SESSION END: {len(results)} rows processed ---") + log_v(f"--- Migration Finished. Log: {log_filename} ---") if __name__ == "__main__": main()