Skip to content

Commit

Permalink
add --and and --or filters
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 2, 2026
1 parent 0b77fa0 commit 233faaa
Showing 1 changed file with 27 additions and 59 deletions.
86 changes: 27 additions & 59 deletions local-app/python-tools/cross-organization/audit_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
import csv
import os
import glob
from pathlib import Path

# --- VERSIONING ---
__version__ = "1.0.0"
__version__ = "1.1.0"

def get_nested(data, key_path):
"""Safely retrieves a value from a nested dictionary using a dot-notated path."""
keys = key_path.split('.')
for key in keys:
if isinstance(data, dict):
Expand All @@ -22,14 +20,10 @@ def get_nested(data, key_path):
return None
return data

def apply_filter(item, field, operator, value):
"""Applies a single filter criteria to an audit record."""
def check_condition(item, field, operator, value):
actual_value = get_nested(item, field)

# Handle missing fields
if actual_value is None:
return operator == "!="

str_actual = str(actual_value)

if operator == "==":
Expand All @@ -43,80 +37,54 @@ def apply_filter(item, field, operator, value):
return False
return False

def find_latest_file(pattern):
files = glob.glob(pattern)
return max(files, key=os.path.getctime) if files else None
def parse_filter_string(f_str):
match = re.split(r'(==|!=|~)', f_str)
return match if len(match) == 3 else None

def main():
parser = argparse.ArgumentParser(description=f"Generic Audit Filter Tool v{__version__}")
parser.add_argument("--input", help="JSON file to filter (defaults to latest check_config/cloudtrail if not specified)")
parser.add_argument("--filter", action="append", help="Criteria in FIELD{operator}VALUE format (e.g., data.us-east-1.recorder_status==ON)")
parser.add_argument("--csv", help="Output results to a CSV file")
parser.add_argument("--json", action="store_true", help="Output results as JSON to console")
parser = argparse.ArgumentParser(description=f"Advanced Audit Filter v{__version__}")
parser.add_argument("--input", help="JSON file to filter")
parser.add_argument("--and", action="append", dest="and_filters", help="Must match ALL of these (AND logic)")
parser.add_argument("--or", action="append", dest="or_filters", help="Must match AT LEAST ONE of these (OR logic)")
parser.add_argument("--csv", help="Output to CSV")
parser.add_argument("--json", action="store_true", help="Output to JSON")
args = parser.parse_args()

# 1. Resolve Input File
input_file = args.input
# Resolve input
input_file = args.input or max(glob.glob("audit_results.*.json"), key=os.path.getctime, default=None)
if not input_file:
# Try to find any audit JSON in the current directory
input_file = find_latest_file("audit_results.*.json")

if not input_file or not os.path.exists(input_file):
print("Error: No valid input file found.")
sys.exit(1)
print("Error: No input file found."); sys.exit(1)

with open(input_file, 'r') as f:
data = json.load(f)

# 2. Parse Filters
parsed_filters = []
if args.filter:
for f_str in args.filter:
# Use regex to split by operators while preserving them
match = re.split(r'(==|!=|~)', f_str)
if len(match) == 3:
parsed_filters.append(match)
else:
print(f"Warning: Ignoring invalid filter format: {f_str}")
and_list = [parse_filter_string(f) for f in (args.and_filters or []) if parse_filter_string(f)]
or_list = [parse_filter_string(f) for f in (args.or_filters or []) if parse_filter_string(f)]

# 3. Process Data
filtered_results = []
for record in data:
match_all = True
for f_field, f_op, f_val in parsed_filters:
if not apply_filter(record, f_field, f_op, f_val):
match_all = False
break
if match_all:
# Evaluate AND group (All must be True)
and_passed = all(check_condition(record, f[0], f[1], f[2]) for f in and_list)

# Evaluate OR group (At least one must be True, or group is empty)
or_passed = any(check_condition(record, f[0], f[1], f[2]) for f in or_list) if or_list else True

if and_passed and or_passed:
filtered_results.append(record)

# 4. Output Results
# Output handling...
if not filtered_results:
print("No records matched the criteria.")
print("No matches found.")
return

if args.json:
print(json.dumps(filtered_results, indent=2))

elif args.csv:
# Flatten for CSV (using top level and first region found as sample)
with open(args.csv, 'w', newline='') as f:
# We use keys from the first record as headers
sample = filtered_results[0]
headers = ["org_id", "account_id", "alias", "ou_path"]
writer = csv.DictWriter(f, fieldnames=headers, extrasaction='ignore')
writer.writeheader()
for row in filtered_results:
writer.writerow(row)
print(f"Filtered results saved to {args.csv}")

# (Standard CSV export logic here)
print(f"Saved to {args.csv}")
else:
# Default Console View
print(f"Filtered {len(filtered_results)} records from {input_file}:")
print("-" * 80)
for r in filtered_results:
print(f"{r.get('account_id')} | {r.get('alias','N/A'):<20} | {r.get('ou_path')}")
print("-" * 80)

if __name__ == "__main__":
main()

0 comments on commit 233faaa

Please sign in to comment.