From a05f9017678388f40401f40ecc7dfc0b866e50e7 Mon Sep 17 00:00:00 2001 From: badra001 Date: Wed, 14 Jan 2026 12:32:22 -0500 Subject: [PATCH] add features --- .../cross-organization/tag-checker.py | 80 ++++++++++++------- 1 file changed, 49 insertions(+), 31 deletions(-) diff --git a/local-app/python-tools/cross-organization/tag-checker.py b/local-app/python-tools/cross-organization/tag-checker.py index 32029956..4f7d6b46 100755 --- a/local-app/python-tools/cross-organization/tag-checker.py +++ b/local-app/python-tools/cross-organization/tag-checker.py @@ -7,12 +7,14 @@ import sys from botocore.exceptions import ClientError +__version__ = "1.0.2" + def get_args(): - parser = argparse.ArgumentParser(description="Scan AWS Org for specific tag keys.") + parser = argparse.ArgumentParser(description=f"AWS Org Tag Scanner v{__version__}") parser.add_argument("--role-name", required=True, help="Role to assume in member accounts") - parser.add_argument("--region", required=True, help="Region to use for the API calls") - parser.add_argument("--profile", required=True, help="AWS CLI profile for the Management Account") - parser.add_argument("--tags-file", required=True, help="File with one tag key per line") + parser.add_argument("--region", required=True, help="Primary region for API initialization") + parser.add_argument("--profile", required=True, help="AWS CLI profile for Management Account") + parser.add_argument("--tags-file", required=True, help="CSV file with Tag Key in the first column") return parser.parse_args() def get_session_for_account(management_session, account_id, role_name, partition): @@ -25,7 +27,7 @@ def get_session_for_account(management_session, account_id, role_name, partition ) return boto3.Session( aws_access_key_id=response['Credentials']['AccessKeyId'], - aws_secret_access_key=response['Credentials']['AccessKeyId'], + aws_secret_access_key=response['Credentials']['SecretAccessKey'], aws_session_token=response['Credentials']['SessionToken'] ) except ClientError as e: @@ -33,31 +35,47 @@ def get_session_for_account(management_session, account_id, role_name, partition return None def get_account_alias(session): - """Returns the IAM alias if it exists, otherwise None.""" try: iam = session.client('iam') aliases = iam.list_account_aliases()['AccountAliases'] - return aliases[0] if aliases else None + return aliases[0] if aliases else "No Alias" except ClientError: - return None + return "Unknown/Unauthorized" def main(): args = get_args() + print(f"[*] Initializing Tag Scanner v{__version__}") # Initialize Management Session - session = boto3.Session(profile_name=args.profile) - org_client = session.client('organizations') - - # Get Partition and Account Info - sts_client = session.client('sts') - identity = sts_client.get_caller_identity() - partition = identity['Arn'].split(':')[1] + try: + session = boto3.Session(profile_name=args.profile) + org_client = session.client('organizations') + sts_client = session.client('sts') + identity = sts_client.get_caller_identity() + partition = identity['Arn'].split(':')[1] + except Exception as e: + print(f"[!] Initialization failed: {e}") + sys.exit(1) - # Load Tag Keys - with open(args.tags_file, 'r') as f: - tag_keys = [line.strip().replace('"', '').replace("'", "") for line in f if line.strip()] + # Load Tag Keys from CSV (First Column) + tag_keys = [] + try: + with open(args.tags_file, mode='r', encoding='utf-8-sig') as f: + reader = csv.reader(f) + next(reader) # Skip Header: Tag Key,Status,Type,Last updated date,Last used month + for row in reader: + if row: + # Strip quotes and whitespace + tag_keys.append(row[0].strip().replace('"', '').replace("'", "")) + except Exception as e: + print(f"[!] Error reading tags file: {e}") + sys.exit(1) + + if not tag_keys: + print("[!] No tag keys found in the input file.") + sys.exit(0) - print(f"[*] Starting scan for {len(tag_keys)} tags across Organization...") + print(f"[*] Loaded {len(tag_keys)} keys. Starting cross-account scan...") all_results = [] @@ -69,36 +87,36 @@ def main(): continue acc_id = account['Id'] - acc_name = account['Name'] # This is the Org-level name + acc_name = account['Name'] print(f"[*] Scanning Account: {acc_id} ({acc_name})") m_session = get_session_for_account(session, acc_id, args.role_name, partition) if not m_session: continue - # Get IAM Alias (more useful for many teams than the Org Name) alias = get_account_alias(m_session) - display_name = alias if alias else acc_name - # Scan all regions (Resource Groups Tagging API is regional) - ec2 = m_session.client('ec2', region_name=args.region) - regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']] + # Get all enabled regions for this account + try: + ec2 = m_session.client('ec2', region_name=args.region) + regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']] + except ClientError: + regions = [args.region] # Fallback to provided region for region in regions: tag_client = m_session.client('resourcegroupstaggingapi', region_name=region) tag_paginator = tag_client.get_paginator('get_resources') - # Filter for your specific tag keys + # Tagging API allows up to 50 filters per call tag_filters = [{'Key': k} for k in tag_keys] try: - # Tagging API allows up to 50 filters per call for i in range(0, len(tag_filters), 50): chunk = tag_filters[i:i+50] for tag_page in tag_paginator.paginate(TagFilters=chunk): for r_mapping in tag_page.get('ResourceTagMappingList', []): arn = r_mapping['ResourceARN'] - # Find which specific keys from our list were found on this resource + # Find matching keys found_tags = [t for t in r_mapping['Tags'] if t['Key'] in tag_keys] for tag in found_tags: @@ -106,7 +124,7 @@ def main(): "tag_name": tag['Key'], "tag_value": tag['Value'], "account_id": acc_id, - "account_alias": display_name, + "account_alias": alias, "region": region, "arn": arn }) @@ -119,9 +137,9 @@ def main(): # Output to CSV if all_results: - keys = all_results[0].keys() + headers = ["tag_name", "tag_value", "account_id", "account_alias", "region", "arn"] with open('findings.csv', 'w', newline='') as cf: - writer = csv.DictWriter(cf, fieldnames=keys) + writer = csv.DictWriter(cf, fieldnames=headers) writer.writeheader() writer.writerows(all_results)