-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
100 additions
and
0 deletions.
There are no files selected for viewing
100 changes: 100 additions & 0 deletions
100
local-app/python-tools/cross-organization/analyze-tag-data.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| #!/bin/env python | ||
|
|
||
| import csv | ||
| import argparse | ||
| import sys | ||
| import glob | ||
| from collections import Counter, defaultdict | ||
|
|
||
| def get_args(): | ||
| parser = argparse.ArgumentParser(description="AWS Tag Data Analyzer v1.0.0") | ||
| parser.add_argument("--tags-file", required=True, help="Original CSV with TagKey and Status") | ||
| parser.add_argument("--findings-file", nargs='+', required=True, help="One or more CSV findings files (supports wildcards)") | ||
| parser.add_argument("--output", help="Optional CSV file to save analysis results") | ||
| return parser.parse_args() | ||
|
|
||
| def analyze(): | ||
| args = get_args() | ||
|
|
||
| # 1. Load Tag Metadata (Status) | ||
| tag_metadata = {} | ||
| try: | ||
| with open(args.tags_file, mode='r', encoding='utf-8-sig') as f: | ||
| reader = csv.DictReader(f) | ||
| for row in reader: | ||
| key = row.get('TagKey', '').strip() | ||
| status = row.get('Status', 'Unknown').strip() | ||
| if key: | ||
| tag_metadata[key] = status | ||
| except Exception as e: | ||
| print(f"[!] Error reading tags-file: {e}") | ||
| sys.exit(1) | ||
|
|
||
| # 2. Load Findings | ||
| findings_count = Counter() | ||
| account_map = defaultdict(set) | ||
| total_rows = 0 | ||
|
|
||
| # Expand wildcards if necessary (especially on Windows) | ||
| files_to_process = [] | ||
| for pattern in args.findings_file: | ||
| files_to_process.extend(glob.glob(pattern)) | ||
|
|
||
| if not files_to_process: | ||
| print("[!] No findings files found matching the criteria.") | ||
| sys.exit(1) | ||
|
|
||
| print(f"[*] Processing {len(files_to_process)} findings files...") | ||
|
|
||
| for file in files_to_process: | ||
| try: | ||
| with open(file, mode='r', encoding='utf-8') as f: | ||
| reader = csv.DictReader(f) | ||
| for row in reader: | ||
| tag_name = row.get('tag_name') | ||
| acc_id = row.get('account_id') | ||
| if tag_name: | ||
| findings_count[tag_name] += 1 | ||
| if acc_id: | ||
| account_map[tag_name].add(acc_id) | ||
| total_rows += 1 | ||
| except Exception as e: | ||
| print(f"[!] Error reading {file}: {e}") | ||
|
|
||
| # 3. Generate Report | ||
| print(f"\n{'='*85}") | ||
| print(f"{'Tag Name':<35} | {'Status':<10} | {'Instances':<12} | {'Unique Accounts'}") | ||
| print(f"{'-'*85}") | ||
|
|
||
| report_rows = [] | ||
| # Sort by Instance count descending | ||
| sorted_tags = sorted(findings_count.items(), key=lambda x: x[1], reverse=True) | ||
|
|
||
| for tag, count in sorted_tags: | ||
| status = tag_metadata.get(tag, "Not in List") | ||
| unique_accs = len(account_map[tag]) | ||
| print(f"{tag:<35} | {status:<10} | {count:<12} | {unique_accs}") | ||
|
|
||
| report_rows.append({ | ||
| "TagKey": tag, | ||
| "Status": status, | ||
| "TotalInstances": count, | ||
| "UniqueAccounts": unique_accs | ||
| }) | ||
|
|
||
| print(f"{'='*85}") | ||
| print(f"Total Resources Scanned with Hits: {total_rows}\n") | ||
|
|
||
| # 4. Optional Export | ||
| if args.output: | ||
| try: | ||
| with open(args.output, 'w', newline='') as f: | ||
| writer = csv.DictWriter(f, fieldnames=["TagKey", "Status", "TotalInstances", "UniqueAccounts"]) | ||
| writer.writeheader() | ||
| writer.writerows(report_rows) | ||
| print(f"[+] Analysis saved to: {args.output}") | ||
| except Exception as e: | ||
| print(f"[!] Export failed: {e}") | ||
|
|
||
| if __name__ == "__main__": | ||
| analyze() |