diff --git a/local-app/python-tools/cross-organization/audit_filter.py b/local-app/python-tools/cross-organization/audit_filter.py new file mode 100755 index 00000000..05b6e37b --- /dev/null +++ b/local-app/python-tools/cross-organization/audit_filter.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python + +import json +import argparse +import re +import sys +import csv +import os +import glob +from pathlib import Path + +# --- VERSIONING --- +__version__ = "1.0.0" + +def get_nested(data, key_path): + """Safely retrieves a value from a nested dictionary using a dot-notated path.""" + keys = key_path.split('.') + for key in keys: + if isinstance(data, dict): + data = data.get(key) + else: + return None + return data + +def apply_filter(item, field, operator, value): + """Applies a single filter criteria to an audit record.""" + actual_value = get_nested(item, field) + + # Handle missing fields + if actual_value is None: + return operator == "!=" + + str_actual = str(actual_value) + + if operator == "==": + return str_actual == value + elif operator == "!=": + return str_actual != value + elif operator == "~": + try: + return bool(re.search(value, str_actual, re.IGNORECASE)) + except re.error: + return False + return False + +def find_latest_file(pattern): + files = glob.glob(pattern) + return max(files, key=os.path.getctime) if files else None + +def main(): + parser = argparse.ArgumentParser(description=f"Generic Audit Filter Tool v{__version__}") + parser.add_argument("--input", help="JSON file to filter (defaults to latest check_config/cloudtrail if not specified)") + parser.add_argument("--filter", action="append", help="Criteria in FIELD{operator}VALUE format (e.g., data.us-east-1.recorder_status==ON)") + parser.add_argument("--csv", help="Output results to a CSV file") + parser.add_argument("--json", action="store_true", help="Output results as JSON to console") + args = parser.parse_args() + + # 1. Resolve Input File + input_file = args.input + if not input_file: + # Try to find any audit JSON in the current directory + input_file = find_latest_file("audit_results.*.json") + + if not input_file or not os.path.exists(input_file): + print("Error: No valid input file found.") + sys.exit(1) + + with open(input_file, 'r') as f: + data = json.load(f) + + # 2. Parse Filters + parsed_filters = [] + if args.filter: + for f_str in args.filter: + # Use regex to split by operators while preserving them + match = re.split(r'(==|!=|~)', f_str) + if len(match) == 3: + parsed_filters.append(match) + else: + print(f"Warning: Ignoring invalid filter format: {f_str}") + + # 3. Process Data + filtered_results = [] + for record in data: + match_all = True + for f_field, f_op, f_val in parsed_filters: + if not apply_filter(record, f_field, f_op, f_val): + match_all = False + break + if match_all: + filtered_results.append(record) + + # 4. Output Results + if not filtered_results: + print("No records matched the criteria.") + return + + if args.json: + print(json.dumps(filtered_results, indent=2)) + + elif args.csv: + # Flatten for CSV (using top level and first region found as sample) + with open(args.csv, 'w', newline='') as f: + # We use keys from the first record as headers + sample = filtered_results[0] + headers = ["org_id", "account_id", "alias", "ou_path"] + writer = csv.DictWriter(f, fieldnames=headers, extrasaction='ignore') + writer.writeheader() + for row in filtered_results: + writer.writerow(row) + print(f"Filtered results saved to {args.csv}") + + else: + # Default Console View + print(f"Filtered {len(filtered_results)} records from {input_file}:") + print("-" * 80) + for r in filtered_results: + print(f"{r.get('account_id')} | {r.get('alias','N/A'):<20} | {r.get('ou_path')}") + print("-" * 80) + +if __name__ == "__main__": + main()