-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
122 additions
and
0 deletions.
There are no files selected for viewing
122 changes: 122 additions & 0 deletions
122
local-app/python-tools/cross-organization/audit_filter.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| #!/usr/bin/env python | ||
|
|
||
| import json | ||
| import argparse | ||
| import re | ||
| import sys | ||
| import csv | ||
| import os | ||
| import glob | ||
| from pathlib import Path | ||
|
|
||
| # --- VERSIONING --- | ||
| __version__ = "1.0.0" | ||
|
|
||
| def get_nested(data, key_path): | ||
| """Safely retrieves a value from a nested dictionary using a dot-notated path.""" | ||
| keys = key_path.split('.') | ||
| for key in keys: | ||
| if isinstance(data, dict): | ||
| data = data.get(key) | ||
| else: | ||
| return None | ||
| return data | ||
|
|
||
| def apply_filter(item, field, operator, value): | ||
| """Applies a single filter criteria to an audit record.""" | ||
| actual_value = get_nested(item, field) | ||
|
|
||
| # Handle missing fields | ||
| if actual_value is None: | ||
| return operator == "!=" | ||
|
|
||
| str_actual = str(actual_value) | ||
|
|
||
| if operator == "==": | ||
| return str_actual == value | ||
| elif operator == "!=": | ||
| return str_actual != value | ||
| elif operator == "~": | ||
| try: | ||
| return bool(re.search(value, str_actual, re.IGNORECASE)) | ||
| except re.error: | ||
| return False | ||
| return False | ||
|
|
||
| def find_latest_file(pattern): | ||
| files = glob.glob(pattern) | ||
| return max(files, key=os.path.getctime) if files else None | ||
|
|
||
| def main(): | ||
| parser = argparse.ArgumentParser(description=f"Generic Audit Filter Tool v{__version__}") | ||
| parser.add_argument("--input", help="JSON file to filter (defaults to latest check_config/cloudtrail if not specified)") | ||
| parser.add_argument("--filter", action="append", help="Criteria in FIELD{operator}VALUE format (e.g., data.us-east-1.recorder_status==ON)") | ||
| parser.add_argument("--csv", help="Output results to a CSV file") | ||
| parser.add_argument("--json", action="store_true", help="Output results as JSON to console") | ||
| args = parser.parse_args() | ||
|
|
||
| # 1. Resolve Input File | ||
| input_file = args.input | ||
| if not input_file: | ||
| # Try to find any audit JSON in the current directory | ||
| input_file = find_latest_file("audit_results.*.json") | ||
|
|
||
| if not input_file or not os.path.exists(input_file): | ||
| print("Error: No valid input file found.") | ||
| sys.exit(1) | ||
|
|
||
| with open(input_file, 'r') as f: | ||
| data = json.load(f) | ||
|
|
||
| # 2. Parse Filters | ||
| parsed_filters = [] | ||
| if args.filter: | ||
| for f_str in args.filter: | ||
| # Use regex to split by operators while preserving them | ||
| match = re.split(r'(==|!=|~)', f_str) | ||
| if len(match) == 3: | ||
| parsed_filters.append(match) | ||
| else: | ||
| print(f"Warning: Ignoring invalid filter format: {f_str}") | ||
|
|
||
| # 3. Process Data | ||
| filtered_results = [] | ||
| for record in data: | ||
| match_all = True | ||
| for f_field, f_op, f_val in parsed_filters: | ||
| if not apply_filter(record, f_field, f_op, f_val): | ||
| match_all = False | ||
| break | ||
| if match_all: | ||
| filtered_results.append(record) | ||
|
|
||
| # 4. Output Results | ||
| if not filtered_results: | ||
| print("No records matched the criteria.") | ||
| return | ||
|
|
||
| if args.json: | ||
| print(json.dumps(filtered_results, indent=2)) | ||
|
|
||
| elif args.csv: | ||
| # Flatten for CSV (using top level and first region found as sample) | ||
| with open(args.csv, 'w', newline='') as f: | ||
| # We use keys from the first record as headers | ||
| sample = filtered_results[0] | ||
| headers = ["org_id", "account_id", "alias", "ou_path"] | ||
| writer = csv.DictWriter(f, fieldnames=headers, extrasaction='ignore') | ||
| writer.writeheader() | ||
| for row in filtered_results: | ||
| writer.writerow(row) | ||
| print(f"Filtered results saved to {args.csv}") | ||
|
|
||
| else: | ||
| # Default Console View | ||
| print(f"Filtered {len(filtered_results)} records from {input_file}:") | ||
| print("-" * 80) | ||
| for r in filtered_results: | ||
| print(f"{r.get('account_id')} | {r.get('alias','N/A'):<20} | {r.get('ou_path')}") | ||
| print("-" * 80) | ||
|
|
||
| if __name__ == "__main__": | ||
| main() |