diff --git a/local-app/python-tools/cross-organization/tag-checker.py b/local-app/python-tools/cross-organization/tag-checker.py index 4f7d6b46..3e39efec 100755 --- a/local-app/python-tools/cross-organization/tag-checker.py +++ b/local-app/python-tools/cross-organization/tag-checker.py @@ -5,9 +5,11 @@ import json import argparse import sys +import time +from datetime import datetime from botocore.exceptions import ClientError -__version__ = "1.0.2" +__version__ = "1.0.3" def get_args(): parser = argparse.ArgumentParser(description=f"AWS Org Tag Scanner v{__version__}") @@ -30,8 +32,7 @@ def get_session_for_account(management_session, account_id, role_name, partition aws_secret_access_key=response['Credentials']['SecretAccessKey'], aws_session_token=response['Credentials']['SessionToken'] ) - except ClientError as e: - print(f" [!] Could not assume role for {account_id}: {e}") + except ClientError: return None def get_account_alias(session): @@ -40,111 +41,107 @@ def get_account_alias(session): aliases = iam.list_account_aliases()['AccountAliases'] return aliases[0] if aliases else "No Alias" except ClientError: - return "Unknown/Unauthorized" + return "Unknown" def main(): args = get_args() - print(f"[*] Initializing Tag Scanner v{__version__}") + start_time_overall = time.time() # Initialize Management Session - try: - session = boto3.Session(profile_name=args.profile) - org_client = session.client('organizations') - sts_client = session.client('sts') - identity = sts_client.get_caller_identity() - partition = identity['Arn'].split(':')[1] - except Exception as e: - print(f"[!] Initialization failed: {e}") - sys.exit(1) + session = boto3.Session(profile_name=args.profile) + org_client = session.client('organizations') + partition = session.client('sts').get_caller_identity()['Arn'].split(':')[1] - # Load Tag Keys from CSV (First Column) + # Load Tag Keys tag_keys = [] - try: - with open(args.tags_file, mode='r', encoding='utf-8-sig') as f: - reader = csv.reader(f) - next(reader) # Skip Header: Tag Key,Status,Type,Last updated date,Last used month - for row in reader: - if row: - # Strip quotes and whitespace - tag_keys.append(row[0].strip().replace('"', '').replace("'", "")) - except Exception as e: - print(f"[!] Error reading tags file: {e}") - sys.exit(1) + with open(args.tags_file, mode='r', encoding='utf-8-sig') as f: + reader = csv.reader(f) + next(reader) # Skip header + tag_keys = [row[0].strip() for row in reader if row] - if not tag_keys: - print("[!] No tag keys found in the input file.") - sys.exit(0) - - print(f"[*] Loaded {len(tag_keys)} keys. Starting cross-account scan...") + print(f"[*] Starting Scan v{__version__} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") all_results = [] - - # Iterate Accounts + account_metrics = [] + total_resources_found = 0 + paginator = org_client.get_paginator('list_accounts') for page in paginator.paginate(): for account in page['Accounts']: - if account['Status'] != 'ACTIVE': - continue - - acc_id = account['Id'] - acc_name = account['Name'] - print(f"[*] Scanning Account: {acc_id} ({acc_name})") + if account['Status'] != 'ACTIVE': continue + + acc_start_time = time.time() + acc_id, acc_name = account['Id'], account['Name'] + print(f" --> Processing {acc_id} ({acc_name})...", end="\r") m_session = get_session_for_account(session, acc_id, args.role_name, partition) if not m_session: + print(f" [!] Skipped {acc_id}: Access Denied") continue alias = get_account_alias(m_session) - - # Get all enabled regions for this account + resources_in_account = 0 + + # Regional Scan try: ec2 = m_session.client('ec2', region_name=args.region) regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']] except ClientError: - regions = [args.region] # Fallback to provided region - + regions = [args.region] + for region in regions: tag_client = m_session.client('resourcegroupstaggingapi', region_name=region) tag_paginator = tag_client.get_paginator('get_resources') - # Tagging API allows up to 50 filters per call - tag_filters = [{'Key': k} for k in tag_keys] - - try: - for i in range(0, len(tag_filters), 50): - chunk = tag_filters[i:i+50] - for tag_page in tag_paginator.paginate(TagFilters=chunk): - for r_mapping in tag_page.get('ResourceTagMappingList', []): - arn = r_mapping['ResourceARN'] - # Find matching keys - found_tags = [t for t in r_mapping['Tags'] if t['Key'] in tag_keys] - - for tag in found_tags: - all_results.append({ - "tag_name": tag['Key'], - "tag_value": tag['Value'], - "account_id": acc_id, - "account_alias": alias, - "region": region, - "arn": arn - }) - except Exception as e: - print(f" [!] Error in {region}: {e}") + # We filter by 50 tags per API call (AWS limit) + for i in range(0, len(tag_keys), 50): + chunk = [{'Key': k} for k in tag_keys[i:i+50]] + for tag_page in tag_paginator.paginate(TagFilters=chunk): + for r_mapping in tag_page.get('ResourceTagMappingList', []): + resources_in_account += 1 + arn = r_mapping['ResourceARN'] + found_tags = [t for t in r_mapping['Tags'] if t['Key'] in tag_keys] + for tag in found_tags: + all_results.append({ + "tag_name": tag['Key'], "tag_value": tag['Value'], + "account_id": acc_id, "account_alias": alias, + "region": region, "arn": arn + }) - # Output to JSON - with open('findings.json', 'w') as jf: - json.dump(all_results, jf, indent=4) - - # Output to CSV + acc_elapsed = time.time() - acc_start_time + total_resources_found += resources_in_account + account_metrics.append({ + "account_id": acc_id, + "account_name": acc_name, + "account_alias": alias, + "resources_with_target_tags": resources_in_account, + "elapsed_seconds": round(acc_elapsed, 2) + }) + + # Summary Generation + total_elapsed = time.time() - start_time_overall + summary = { + "version": __version__, + "timestamp": datetime.now().isoformat(), + "total_accounts_scanned": len(account_metrics), + "total_target_resources_found": total_resources_found, + "total_elapsed_seconds": round(total_elapsed, 2), + "average_seconds_per_account": round(total_elapsed / len(account_metrics), 2) if account_metrics else 0, + "account_details": account_metrics + } + + # Save Outputs + with open('findings.json', 'w') as f: json.dump(all_results, f, indent=4) + with open('summary_metrics.json', 'w') as f: json.dump(summary, f, indent=4) if all_results: - headers = ["tag_name", "tag_value", "account_id", "account_alias", "region", "arn"] - with open('findings.csv', 'w', newline='') as cf: - writer = csv.DictWriter(cf, fieldnames=headers) - writer.writeheader() - writer.writerows(all_results) - - print(f"\n[+] Scan Complete. Found {len(all_results)} instances.") - print("[+] Files generated: findings.json, findings.csv") + with open('findings.csv', 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=all_results[0].keys()) + writer.writeheader(); writer.writerows(all_results) + + print("\n" + "="*40) + print(f"SCAN SUMMARY (v{__version__})") + print(f"Total Accounts: {summary['total_accounts_scanned']}") + print(f"Total Resources: {summary['total_target_resources_found']}") + print(f"Total Time: {summary['total_elapsed_seconds']}s") + print("="*40) -if __name__ == "__main__": - main()