From e71b1ba67e4c9ffefcadcab3644c74e5f405ef53 Mon Sep 17 00:00:00 2001 From: badra001 Date: Thu, 15 Jan 2026 09:58:26 -0500 Subject: [PATCH] fix alias --- .../cross-organization/tag-checker.py | 124 ++++++++---------- 1 file changed, 52 insertions(+), 72 deletions(-) diff --git a/local-app/python-tools/cross-organization/tag-checker.py b/local-app/python-tools/cross-organization/tag-checker.py index aa87aa0b..3f25ce72 100755 --- a/local-app/python-tools/cross-organization/tag-checker.py +++ b/local-app/python-tools/cross-organization/tag-checker.py @@ -13,7 +13,7 @@ from botocore.exceptions import ClientError from tqdm import tqdm -__version__ = "1.1.0" +__version__ = "1.1.1" def get_args(): parser = argparse.ArgumentParser(description=f"AWS Org Tag Scanner v{__version__}") @@ -22,10 +22,10 @@ def get_args(): parser.add_argument("--profile", required=True, help="AWS CLI profile for Management Account") parser.add_argument("--tags-file", required=True, help="CSV file with Tag Key in the first column") parser.add_argument("--max-workers", type=int, default=8, help="Max concurrent account scans (default: 8)") - parser.add_argument("--account-regex", help="Regex to filter accounts by alias/name") - parser.add_argument("--accounts-from", help="File containing specific Account IDs to process") + parser.add_argument("--account-regex", help="Regex to filter accounts by alias") + parser.add_argument("--accounts-from", help="File of Account IDs to process (one per line)") parser.add_argument("--output", default="tag_checker_findings", help="Prefix for output files") - parser.add_argument("--limit", type=int, default=0, help="Hard limit on total accounts to scan") + parser.add_argument("--limit", type=int, default=0, help="Limit total accounts processed") return parser.parse_args() def get_session(management_session, account_id, role_name, partition): @@ -39,24 +39,28 @@ def get_session(management_session, account_id, role_name, partition): aws_session_token=c['SessionToken']) except: return None -def get_alias_fixed(session): - """Accurately retrieves the IAM alias from the member account.""" - try: - return session.client('iam').list_account_aliases()['AccountAliases'][0] - except: return "No Alias" - -def scan_account(account, management_session, role_name, partition, tag_keys, region_name, position): - acc_id, acc_name = account['Id'], account['Name'] +def scan_account(account, management_session, role_name, partition, tag_keys, region_name, lane_id, account_regex): + acc_id = account['Id'] m_session = get_session(management_session, acc_id, role_name, partition) if not m_session: - return [], acc_id, "AUTH_FAIL", 0 + return [], acc_id, "N/A", f"Skipped: Cannot assume role" + + # CRITICAL: Retrieve the true alias from the target account + try: + alias_list = m_session.client('iam').list_account_aliases().get('AccountAliases', []) + alias = alias_list[0] if alias_list else "N/A" + except Exception: + alias = "N/A" - alias = get_alias_fixed(m_session) + # Regex Filter check + if account_regex and not re.search(account_regex, alias, re.IGNORECASE): + return [], acc_id, alias, f"Skipped: Regex mismatch ({alias})" + findings = [] - - # Inner progress bar (The "Lane") - pbar = tqdm(total=len(tag_keys), desc=f"Lane {position}: {acc_id}", position=position, leave=False) + # Progress bar with Account ID and Alias + pbar = tqdm(total=len(tag_keys), desc=f"Lane {lane_id} | {acc_id} ({alias})", + position=lane_id, leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}') try: ec2 = m_session.client('ec2', region_name=region_name) @@ -80,93 +84,69 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re pbar.update(1) pbar.close() - return findings, acc_id, alias, len(findings) + return findings, acc_id, alias, f"Completed: {len(findings)} hits" def main(): args = get_args() ts = datetime.now().strftime("%Y%m%d_%H%M%S") - start_time = time.time() + start_overall = time.time() - # 1. Initialization session = boto3.Session(profile_name=args.profile) org = session.client('organizations') partition = session.client('sts').get_caller_identity()['Arn'].split(':')[1] - # 2. Pre-scan Summary - print(f"\n{'='*50}\nAWS TAG CHECKER v{__version__}\n{'='*50}") - print(f"Profile: {args.profile} | Region: {args.region} | Role: {args.role_name}") - - # 3. Load Tags and Account Filters with open(args.tags_file, mode='r', encoding='utf-8-sig') as f: - tag_keys = [row[0].strip().replace('"', '') for row in list(csv.reader(f))[1:] if row] + tag_keys = [row[0].strip() for row in list(csv.reader(f))[1:] if row] - allowed_ids = [] + target_ids = [] if args.accounts_from: with open(args.accounts_from, 'r') as f: - allowed_ids = [line.strip() for line in f if line.strip()] + target_ids = [l.strip() for l in f if l.strip()] - # 4. Fetch and Filter Accounts - all_raw_accounts = [] + print(f"\n{'='*60}\nAWS TAG CHECKER v{__version__}\n{'='*60}") + print(f"Profile: {args.profile} | Region: {args.region} | Role: {args.role_name}") + + # Get all accounts from Org + all_accs = [] paginator = org.get_paginator('list_accounts') for page in paginator.paginate(): - all_raw_accounts.extend(page['Accounts']) + for a in page['Accounts']: + if a['Status'] == 'ACTIVE': + if not target_ids or a['Id'] in target_ids: + all_accs.append(a) + + if args.limit > 0: all_accs = all_accs[:args.limit] + print(f"Workers: {args.max_workers} | Tags: {len(tag_keys)} | Targeted Accounts: {len(all_accs)}") + print(f"{'='*60}\n") - to_process = [] - for acc in all_raw_accounts: - if acc['Status'] != 'ACTIVE': continue - if allowed_ids and acc['Id'] not in allowed_ids: continue - - # Temp check for Regex (Requires Alias which we fetch inside threads, - # so here we check Org Name first, then Alias in-thread) - if args.account_regex and not re.search(args.account_regex, acc['Name']): - # We'll re-verify Alias inside the thread for regex accuracy - pass - to_process.append(acc) - - if args.limit > 0: to_process = to_process[:args.limit] - print(f"Accounts Found: {len(all_raw_accounts)} | Targeted: {len(to_process)}") - print(f"{'='*50}\n") - - # 5. Multi-threaded Execution all_findings = [] - summary_data = [] - - # Overall Progress Bar - overall_pbar = tqdm(total=len(to_process), desc="Total Progress", position=0) + overall_pbar = tqdm(total=len(all_accs), desc="Total Org Progress", position=0) with ThreadPoolExecutor(max_workers=args.max_workers) as executor: - # Use a map to track positions (lanes 1 through max_workers) + # Lanes 1 through max_workers futures = {executor.submit(scan_account, acc, session, args.role_name, partition, - tag_keys, args.region, (i % args.max_workers) + 1): acc - for i, acc in enumerate(to_process)} + tag_keys, args.region, (i % args.max_workers) + 1, + args.account_regex): acc for i, acc in enumerate(all_accs)} for future in as_completed(futures): - res, acc_id, alias, count = future.result() - - # Post-thread Regex filtering for Alias accuracy - if args.account_regex and not re.search(args.account_regex, alias): - overall_pbar.write(f"[-] Skipped {acc_id} ({alias}): Regex mismatch") + res, acc_id, alias, status = future.result() + if "Skipped" in status or "Skipping" in status: + overall_pbar.write(f"[-] {acc_id}: {status}") else: all_findings.extend(res) - summary_data.append({"account_id": acc_id, "alias": alias, "hits": count}) - overall_pbar.update(1) overall_pbar.close() + print("\n" * (args.max_workers + 1)) # Clear lanes - # 6. Output Files - csv_file = f"{args.output}_{ts}.csv" - json_file = f"{args.output}_{ts}.json" - - with open(json_file, 'w') as f: json.dump(all_findings, f, indent=4) + csv_out = f"{args.output}_{ts}.csv" if all_findings: - with open(csv_file, 'w', newline='') as f: + with open(csv_out, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=all_findings[0].keys()) - writer.writeheader() - writer.writerows(all_findings) + writer.writeheader(); writer.writerows(all_findings) - print(f"\n[+] Done! Scanned {len(summary_data)} accounts in {round(time.time()-start_time, 2)}s") - print(f"[+] Findings: {csv_file}") + print(f"[+] Scan Complete in {round(time.time()-start_overall, 2)}s") + print(f"[+] Findings saved to: {csv_out}") if __name__ == "__main__": main()