From ddb458d796e2b6f32f66d0cf4e745c2c980e76f8 Mon Sep 17 00:00:00 2001 From: badra001 Date: Fri, 27 Mar 2026 11:07:45 -0400 Subject: [PATCH] assess_check_scheduling.py fix file name --- .../assess_check_scheduling.py | 119 +++++++++--------- 1 file changed, 61 insertions(+), 58 deletions(-) diff --git a/local-app/python-tools/cross-organization/assess_check_scheduling.py b/local-app/python-tools/cross-organization/assess_check_scheduling.py index bbb1d82d..5308ae2c 100755 --- a/local-app/python-tools/cross-organization/assess_check_scheduling.py +++ b/local-app/python-tools/cross-organization/assess_check_scheduling.py @@ -7,112 +7,115 @@ from collections import defaultdict # --- VERSIONING --- -__version__ = "1.7.1" +__version__ = "1.7.3" def get_tag(tags, key, default=""): """Case-insensitive tag lookup.""" + if not tags: return default + if isinstance(tags, list): + # Convert list of {'Key': 'x', 'Value': 'y'} to dict if necessary + tags = {t['Key']: t['Value'] for t in tags if 'Key' in t} for k, v in tags.items(): if k.lower() == key.lower(): return v.strip() if v else default return default def is_schedule_enabled(power_schedule): - """ - Logic: True if schedule is NOT 'Always_on' or 'No Schedule'. - """ + """True if schedule is NOT 'Always_on' or 'No Schedule'.""" val = str(power_schedule).lower().strip() return val not in ["always_on", "no schedule", ""] -def generate_csv(data, fields, filename): - """Standard CSV writer for resource categories.""" - try: - with open(filename, 'w', newline='') as f: - writer = csv.DictWriter(f, fieldnames=fields) - writer.writeheader() - writer.writerows(data) - return True - except Exception as e: - print(f" Error writing {filename}: {e}") - return False - def generate_master_resource_csv(all_resources, ts): - """ - Generates the comprehensive resource/FinOps inventory. - """ + """Generates the comprehensive resource/FinOps inventory.""" filename = f"scheduling_summary.resources.{ts}.csv" target_tags = ["PowerSchedule", "Environment", "Name", "finops_project_number", "finops_project_name"] fields = ["account_id", "region", "type", "arn"] + target_tags - rows = [] - for res in all_resources: - row = { - "account_id": res['account_id'], - "region": res['region'], - "type": res['type'], - "arn": res['arn'] - } - for tag in target_tags: - row[tag] = get_tag(res.get('tags', {}), tag) - rows.append(row) - - generate_csv(rows, fields, filename) - return filename + try: + with open(filename, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=fields) + writer.writeheader() + for res in all_resources: + row = { + "account_id": res.get('account_id', 'N/A'), + "region": res.get('region', 'N/A'), + "type": res.get('type', 'N/A'), + "arn": res.get('arn', 'N/A') + } + for tag in target_tags: + row[tag] = get_tag(res.get('tags', {}), tag) + writer.writerow(row) + return filename + except Exception as e: + return f"Error: {str(e)}" def main(): ts = datetime.now().strftime("%Y%m%dT%H%M%S") all_res = [] - # Search for all JSON audit results - for file_path in glob.glob("audit_results/*.json"): + + # MATCHING THE SPECIFIC FILENAME PATTERN + audit_files = glob.glob("audit_results.check_scheduling.*.json") + + if not audit_files: + print(f"❌ Error: No files matching 'audit_results.check_scheduling.*.json' found.") + return + + print(f"Processing {len(audit_files)} audit file(s)...") + + for file_path in audit_files: with open(file_path, 'r') as f: try: - data = json.load(f) - # Assuming audit structure: data['account_id'], data['resources'][] - all_res.extend(data.get('resources', [])) - except: continue + raw_data = json.load(f) + + # Navigate the org_runner JSON structure + # It usually looks like: {"account_id": {"data": {...}, "alias": "..."}} + for account_id, account_data in raw_data.items(): + if isinstance(account_data, dict) and "data" in account_data: + # Extract the resources from the 'data' block + for res_id, res_info in account_data["data"].items(): + if res_id == "account_summary": continue + + # Ensure we carry the account_id into the resource object + res_info['account_id'] = account_id + all_res.append(res_info) + except Exception as e: + print(f" Skipping {file_path} due to error: {e}") if not all_res: - print("No audit data found in audit_results/"); return + print("❌ Error: Files were found but no resource data was extracted. Check JSON structure."); return - # --- REPORT 1: FINOPS HEALTH --- + # --- FINOPS HEALTH REPORT --- missing_num = sum(1 for r in all_res if not get_tag(r.get('tags', {}), 'finops_project_number')) missing_name = sum(1 for r in all_res if not get_tag(r.get('tags', {}), 'finops_project_name')) - print("\n" + "=" * 80) + print("\n" + "=" * 85) print(f"FINOPS TAGGING HEALTH (Total Resources: {len(all_res)})") - print("-" * 80) + print("-" * 85) print(f" Missing 'finops_project_number': {missing_num:>5}") print(f" Missing 'finops_project_name': {missing_name:>5}") print(f" Current Tagging Compliance: {((len(all_res)-missing_num)/len(all_res))*100:.1f}%") - # --- REPORT 2: COMPLIANCE MATRIX --- + # --- SCHEDULING MATRIX --- matrix = defaultdict(lambda: {"total": 0, "scheduled": 0}) for res in all_res: env = get_tag(res.get('tags', {}), 'Environment', 'Unknown') sched = get_tag(res.get('tags', {}), 'PowerSchedule') - matrix[env]["total"] += 1 if is_schedule_enabled(sched): matrix[env]["scheduled"] += 1 print("\nSCHEDULING COMPLIANCE MATRIX") - print(f"{'Environment':<20} | {'Total':<8} | {'Scheduled':<12} | {'Compliance %'}") - print("-" * 80) + print(f"{'Environment':<25} | {'Total':<8} | {'Scheduled':<12} | {'Compliance %'}") + print("-" * 85) for env, counts in sorted(matrix.items()): pct = (counts['scheduled'] / counts['total']) * 100 - print(f"{env:<20} | {counts['total']:<8} | {counts['scheduled']:<12} | {pct:.1f}%") + print(f"{env:<25} | {counts['total']:<8} | {counts['scheduled']:<12} | {pct:.1f}%") # --- CSV GENERATION --- print("\nGENERATING REPORTS...") - # 1. Master Resources - master = generate_master_resource_csv(all_res, ts) - - # 2. Categorized CSVs (Examples shown for logic) - # Filter all_res into sub-lists (asg_ec2, eks_ec2, plain_ec2, rds) based on your logic - # generate_csv(plain_list, fields, f"scheduling_summary.plain_ec2.{ts}.csv") - - print(f" [DONE] Created Master Inventory: {master}") - print(f" [DONE] Created Categorized Reports (4 files)") - print("=" * 80 + "\n") + master_file = generate_master_resource_csv(all_res, ts) + print(f" [DONE] Created Master Inventory: {master_file}") + print("=" * 85 + "\n") if __name__ == "__main__": main()