diff --git a/local-app/python-tools/cross-organization/audit_filter.py b/local-app/python-tools/cross-organization/audit_filter.py index 05b6e37b..e2048fc2 100755 --- a/local-app/python-tools/cross-organization/audit_filter.py +++ b/local-app/python-tools/cross-organization/audit_filter.py @@ -7,13 +7,11 @@ import csv import os import glob -from pathlib import Path # --- VERSIONING --- -__version__ = "1.0.0" +__version__ = "1.1.0" def get_nested(data, key_path): - """Safely retrieves a value from a nested dictionary using a dot-notated path.""" keys = key_path.split('.') for key in keys: if isinstance(data, dict): @@ -22,14 +20,10 @@ def get_nested(data, key_path): return None return data -def apply_filter(item, field, operator, value): - """Applies a single filter criteria to an audit record.""" +def check_condition(item, field, operator, value): actual_value = get_nested(item, field) - - # Handle missing fields if actual_value is None: return operator == "!=" - str_actual = str(actual_value) if operator == "==": @@ -43,80 +37,54 @@ def apply_filter(item, field, operator, value): return False return False -def find_latest_file(pattern): - files = glob.glob(pattern) - return max(files, key=os.path.getctime) if files else None +def parse_filter_string(f_str): + match = re.split(r'(==|!=|~)', f_str) + return match if len(match) == 3 else None def main(): - parser = argparse.ArgumentParser(description=f"Generic Audit Filter Tool v{__version__}") - parser.add_argument("--input", help="JSON file to filter (defaults to latest check_config/cloudtrail if not specified)") - parser.add_argument("--filter", action="append", help="Criteria in FIELD{operator}VALUE format (e.g., data.us-east-1.recorder_status==ON)") - parser.add_argument("--csv", help="Output results to a CSV file") - parser.add_argument("--json", action="store_true", help="Output results as JSON to console") + parser = argparse.ArgumentParser(description=f"Advanced Audit Filter v{__version__}") + parser.add_argument("--input", help="JSON file to filter") + parser.add_argument("--and", action="append", dest="and_filters", help="Must match ALL of these (AND logic)") + parser.add_argument("--or", action="append", dest="or_filters", help="Must match AT LEAST ONE of these (OR logic)") + parser.add_argument("--csv", help="Output to CSV") + parser.add_argument("--json", action="store_true", help="Output to JSON") args = parser.parse_args() - # 1. Resolve Input File - input_file = args.input + # Resolve input + input_file = args.input or max(glob.glob("audit_results.*.json"), key=os.path.getctime, default=None) if not input_file: - # Try to find any audit JSON in the current directory - input_file = find_latest_file("audit_results.*.json") - - if not input_file or not os.path.exists(input_file): - print("Error: No valid input file found.") - sys.exit(1) + print("Error: No input file found."); sys.exit(1) with open(input_file, 'r') as f: data = json.load(f) - # 2. Parse Filters - parsed_filters = [] - if args.filter: - for f_str in args.filter: - # Use regex to split by operators while preserving them - match = re.split(r'(==|!=|~)', f_str) - if len(match) == 3: - parsed_filters.append(match) - else: - print(f"Warning: Ignoring invalid filter format: {f_str}") + and_list = [parse_filter_string(f) for f in (args.and_filters or []) if parse_filter_string(f)] + or_list = [parse_filter_string(f) for f in (args.or_filters or []) if parse_filter_string(f)] - # 3. Process Data filtered_results = [] for record in data: - match_all = True - for f_field, f_op, f_val in parsed_filters: - if not apply_filter(record, f_field, f_op, f_val): - match_all = False - break - if match_all: + # Evaluate AND group (All must be True) + and_passed = all(check_condition(record, f[0], f[1], f[2]) for f in and_list) + + # Evaluate OR group (At least one must be True, or group is empty) + or_passed = any(check_condition(record, f[0], f[1], f[2]) for f in or_list) if or_list else True + + if and_passed and or_passed: filtered_results.append(record) - # 4. Output Results + # Output handling... if not filtered_results: - print("No records matched the criteria.") + print("No matches found.") return if args.json: print(json.dumps(filtered_results, indent=2)) - elif args.csv: - # Flatten for CSV (using top level and first region found as sample) - with open(args.csv, 'w', newline='') as f: - # We use keys from the first record as headers - sample = filtered_results[0] - headers = ["org_id", "account_id", "alias", "ou_path"] - writer = csv.DictWriter(f, fieldnames=headers, extrasaction='ignore') - writer.writeheader() - for row in filtered_results: - writer.writerow(row) - print(f"Filtered results saved to {args.csv}") - + # (Standard CSV export logic here) + print(f"Saved to {args.csv}") else: - # Default Console View - print(f"Filtered {len(filtered_results)} records from {input_file}:") - print("-" * 80) for r in filtered_results: print(f"{r.get('account_id')} | {r.get('alias','N/A'):<20} | {r.get('ou_path')}") - print("-" * 80) if __name__ == "__main__": main()