Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 14, 2026
1 parent a05f901 commit 226586d
Showing 1 changed file with 78 additions and 81 deletions.
159 changes: 78 additions & 81 deletions local-app/python-tools/cross-organization/tag-checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import json
import argparse
import sys
import time
from datetime import datetime
from botocore.exceptions import ClientError

__version__ = "1.0.2"
__version__ = "1.0.3"

def get_args():
parser = argparse.ArgumentParser(description=f"AWS Org Tag Scanner v{__version__}")
Expand All @@ -30,8 +32,7 @@ def get_session_for_account(management_session, account_id, role_name, partition
aws_secret_access_key=response['Credentials']['SecretAccessKey'],
aws_session_token=response['Credentials']['SessionToken']
)
except ClientError as e:
print(f" [!] Could not assume role for {account_id}: {e}")
except ClientError:
return None

def get_account_alias(session):
Expand All @@ -40,111 +41,107 @@ def get_account_alias(session):
aliases = iam.list_account_aliases()['AccountAliases']
return aliases[0] if aliases else "No Alias"
except ClientError:
return "Unknown/Unauthorized"
return "Unknown"

def main():
args = get_args()
print(f"[*] Initializing Tag Scanner v{__version__}")
start_time_overall = time.time()

# Initialize Management Session
try:
session = boto3.Session(profile_name=args.profile)
org_client = session.client('organizations')
sts_client = session.client('sts')
identity = sts_client.get_caller_identity()
partition = identity['Arn'].split(':')[1]
except Exception as e:
print(f"[!] Initialization failed: {e}")
sys.exit(1)
session = boto3.Session(profile_name=args.profile)
org_client = session.client('organizations')
partition = session.client('sts').get_caller_identity()['Arn'].split(':')[1]

# Load Tag Keys from CSV (First Column)
# Load Tag Keys
tag_keys = []
try:
with open(args.tags_file, mode='r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
next(reader) # Skip Header: Tag Key,Status,Type,Last updated date,Last used month
for row in reader:
if row:
# Strip quotes and whitespace
tag_keys.append(row[0].strip().replace('"', '').replace("'", ""))
except Exception as e:
print(f"[!] Error reading tags file: {e}")
sys.exit(1)
with open(args.tags_file, mode='r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
next(reader) # Skip header
tag_keys = [row[0].strip() for row in reader if row]

if not tag_keys:
print("[!] No tag keys found in the input file.")
sys.exit(0)

print(f"[*] Loaded {len(tag_keys)} keys. Starting cross-account scan...")
print(f"[*] Starting Scan v{__version__} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

all_results = []

# Iterate Accounts
account_metrics = []
total_resources_found = 0

paginator = org_client.get_paginator('list_accounts')
for page in paginator.paginate():
for account in page['Accounts']:
if account['Status'] != 'ACTIVE':
continue

acc_id = account['Id']
acc_name = account['Name']
print(f"[*] Scanning Account: {acc_id} ({acc_name})")
if account['Status'] != 'ACTIVE': continue

acc_start_time = time.time()
acc_id, acc_name = account['Id'], account['Name']
print(f" --> Processing {acc_id} ({acc_name})...", end="\r")

m_session = get_session_for_account(session, acc_id, args.role_name, partition)
if not m_session:
print(f" [!] Skipped {acc_id}: Access Denied")
continue

alias = get_account_alias(m_session)

# Get all enabled regions for this account
resources_in_account = 0

# Regional Scan
try:
ec2 = m_session.client('ec2', region_name=args.region)
regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']]
except ClientError:
regions = [args.region] # Fallback to provided region
regions = [args.region]

for region in regions:
tag_client = m_session.client('resourcegroupstaggingapi', region_name=region)
tag_paginator = tag_client.get_paginator('get_resources')

# Tagging API allows up to 50 filters per call
tag_filters = [{'Key': k} for k in tag_keys]

try:
for i in range(0, len(tag_filters), 50):
chunk = tag_filters[i:i+50]
for tag_page in tag_paginator.paginate(TagFilters=chunk):
for r_mapping in tag_page.get('ResourceTagMappingList', []):
arn = r_mapping['ResourceARN']
# Find matching keys
found_tags = [t for t in r_mapping['Tags'] if t['Key'] in tag_keys]

for tag in found_tags:
all_results.append({
"tag_name": tag['Key'],
"tag_value": tag['Value'],
"account_id": acc_id,
"account_alias": alias,
"region": region,
"arn": arn
})
except Exception as e:
print(f" [!] Error in {region}: {e}")
# We filter by 50 tags per API call (AWS limit)
for i in range(0, len(tag_keys), 50):
chunk = [{'Key': k} for k in tag_keys[i:i+50]]
for tag_page in tag_paginator.paginate(TagFilters=chunk):
for r_mapping in tag_page.get('ResourceTagMappingList', []):
resources_in_account += 1
arn = r_mapping['ResourceARN']
found_tags = [t for t in r_mapping['Tags'] if t['Key'] in tag_keys]
for tag in found_tags:
all_results.append({
"tag_name": tag['Key'], "tag_value": tag['Value'],
"account_id": acc_id, "account_alias": alias,
"region": region, "arn": arn
})

# Output to JSON
with open('findings.json', 'w') as jf:
json.dump(all_results, jf, indent=4)

# Output to CSV
acc_elapsed = time.time() - acc_start_time
total_resources_found += resources_in_account
account_metrics.append({
"account_id": acc_id,
"account_name": acc_name,
"account_alias": alias,
"resources_with_target_tags": resources_in_account,
"elapsed_seconds": round(acc_elapsed, 2)
})

# Summary Generation
total_elapsed = time.time() - start_time_overall
summary = {
"version": __version__,
"timestamp": datetime.now().isoformat(),
"total_accounts_scanned": len(account_metrics),
"total_target_resources_found": total_resources_found,
"total_elapsed_seconds": round(total_elapsed, 2),
"average_seconds_per_account": round(total_elapsed / len(account_metrics), 2) if account_metrics else 0,
"account_details": account_metrics
}

# Save Outputs
with open('findings.json', 'w') as f: json.dump(all_results, f, indent=4)
with open('summary_metrics.json', 'w') as f: json.dump(summary, f, indent=4)
if all_results:
headers = ["tag_name", "tag_value", "account_id", "account_alias", "region", "arn"]
with open('findings.csv', 'w', newline='') as cf:
writer = csv.DictWriter(cf, fieldnames=headers)
writer.writeheader()
writer.writerows(all_results)

print(f"\n[+] Scan Complete. Found {len(all_results)} instances.")
print("[+] Files generated: findings.json, findings.csv")
with open('findings.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=all_results[0].keys())
writer.writeheader(); writer.writerows(all_results)

print("\n" + "="*40)
print(f"SCAN SUMMARY (v{__version__})")
print(f"Total Accounts: {summary['total_accounts_scanned']}")
print(f"Total Resources: {summary['total_target_resources_found']}")
print(f"Total Time: {summary['total_elapsed_seconds']}s")
print("="*40)

if __name__ == "__main__":
main()

0 comments on commit 226586d

Please sign in to comment.