diff --git a/local-app/python-tools/cross-organization/audit_filter.py b/local-app/python-tools/cross-organization/audit_filter.py index 7263f731..e1e59828 100755 --- a/local-app/python-tools/cross-organization/audit_filter.py +++ b/local-app/python-tools/cross-organization/audit_filter.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import json import argparse @@ -12,6 +12,7 @@ __version__ = "1.1.2" def get_nested(data, key_path): + """Safely retrieves nested values using dot-notated paths.""" keys = key_path.split('.') for key in keys: if isinstance(data, dict): @@ -21,10 +22,10 @@ def get_nested(data, key_path): return data def str_to_bool(val): - """Converts common truthy/falsy strings to actual Booleans.""" + """Converts truthy/falsy strings to actual Booleans for accurate matching.""" if isinstance(val, bool): return val - normalized = str(val).lower() + normalized = str(val).lower().strip() if normalized in ('true', 'yes', 't', 'y', '1', 'on'): return True if normalized in ('false', 'no', 'f', 'n', '0', 'off'): @@ -32,21 +33,21 @@ def str_to_bool(val): return val def check_condition(item, field, operator, value): + """Evaluates a single condition, now with Boolean-awareness.""" actual_value = get_nested(item, field) - if actual_value is None: return operator == "!=" - # TYPE-AWARE CONVERSION: - # If the filter value looks like a boolean, convert BOTH to booleans for comparison + # TYPE-AWARE BOOLEAN MATCHING target_value = str_to_bool(value) if isinstance(target_value, bool): + # Convert the value from JSON to boolean before comparing compare_value = str_to_bool(actual_value) if operator == "==": return compare_value == target_value if operator == "!=": return compare_value != target_value - return False # Regex doesn't apply to pure booleans + return False # Regex matches (~) are not supported for pure booleans - # Standard string comparison for everything else + # Standard string matching str_actual = str(actual_value) if operator == "==": return str_actual == value @@ -59,54 +60,55 @@ def check_condition(item, field, operator, value): return False return False -def parse_filter_string(f_str): - # Regex split that handles operators +def parse_filter(f_str): + """Parses a FIELD{operator}VALUE string into its components.""" match = re.split(r'(==|!=|~)', f_str) return match if len(match) == 3 else None def main(): - parser = argparse.ArgumentParser(description=f"Boolean-Aware Audit Filter v{__version__}") - parser.add_argument("--input", help="JSON file to filter") - parser.add_argument("--and", action="append", dest="and_filters", help="Match ALL") - parser.add_argument("--or", action="append", dest="or_filters", help="Match ANY") - parser.add_argument("--csv", help="Output to CSV") - parser.add_argument("--json", action="store_true", help="Output to JSON") + parser = argparse.ArgumentParser(description=f"Boolean-Aware Logic Filter v{__version__}") + parser.add_argument("--input", help="JSON file (defaults to latest audit file)") + parser.add_argument("--and", action="append", dest="and_filters", help="Must match ALL (AND)") + parser.add_argument("--or", action="append", dest="or_filters", help="Must match AT LEAST ONE (OR)") + parser.add_argument("--csv", help="Output results to CSV") + parser.add_argument("--json", action="store_true", help="Output results as JSON") args = parser.parse_args() - # Resolve latest input file automatically + # Find the latest audit file automatically if not specified input_file = args.input or max(glob.glob("audit_results.*.json"), key=os.path.getctime, default=None) if not input_file: - print("Error: No input file found."); sys.exit(1) + print("Error: No audit JSON files found."); sys.exit(1) with open(input_file, 'r') as f: data = json.load(f) - and_list = [parse_filter_string(f) for f in (args.and_filters or []) if parse_filter_string(f)] - or_list = [parse_filter_string(f) for f in (args.or_filters or []) if parse_filter_string(f)] + and_list = [parse_filter(f) for f in (args.and_filters or []) if parse_filter(f)] + or_list = [parse_filter(f) for f in (args.or_filters or []) if parse_filter(f)] filtered_results = [] for record in data: - # Implicitly true if no filters provided + # Check AND group: Every condition must be True and_passed = all(check_condition(record, f[0], f[1], f[2]) for f in and_list) + + # Check OR group: At least one must be True, or group is empty or_passed = any(check_condition(record, f[0], f[1], f[2]) for f in or_list) if or_list else True if and_passed and or_passed: filtered_results.append(record) if not filtered_results: - print("No matches found.") + print("No matching records found.") return if args.json: print(json.dumps(filtered_results, indent=2)) elif args.csv: - # Simple flattening for baseline CSV with open(args.csv, 'w', newline='') as f: headers = ["org_id", "account_id", "alias", "ou_path"] writer = csv.DictWriter(f, fieldnames=headers, extrasaction='ignore') writer.writeheader() writer.writerows(filtered_results) - print(f"Results saved to {args.csv}") + print(f"Exported {len(filtered_results)} records to {args.csv}") else: for r in filtered_results: print(f"{r.get('account_id')} | {r.get('alias','N/A'):<20} | {r.get('ou_path')}")