Skip to content

Commit

Permalink
assess_check_scheduling.py
Browse files Browse the repository at this point in the history
fix file name
  • Loading branch information
badra001 committed Mar 27, 2026
1 parent 3ed80e8 commit ddb458d
Showing 1 changed file with 61 additions and 58 deletions.
119 changes: 61 additions & 58 deletions local-app/python-tools/cross-organization/assess_check_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,112 +7,115 @@
from collections import defaultdict

# --- VERSIONING ---
__version__ = "1.7.1"
__version__ = "1.7.3"

def get_tag(tags, key, default=""):
"""Case-insensitive tag lookup."""
if not tags: return default
if isinstance(tags, list):
# Convert list of {'Key': 'x', 'Value': 'y'} to dict if necessary
tags = {t['Key']: t['Value'] for t in tags if 'Key' in t}
for k, v in tags.items():
if k.lower() == key.lower():
return v.strip() if v else default
return default

def is_schedule_enabled(power_schedule):
"""
Logic: True if schedule is NOT 'Always_on' or 'No Schedule'.
"""
"""True if schedule is NOT 'Always_on' or 'No Schedule'."""
val = str(power_schedule).lower().strip()
return val not in ["always_on", "no schedule", ""]

def generate_csv(data, fields, filename):
"""Standard CSV writer for resource categories."""
try:
with open(filename, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fields)
writer.writeheader()
writer.writerows(data)
return True
except Exception as e:
print(f" Error writing {filename}: {e}")
return False

def generate_master_resource_csv(all_resources, ts):
"""
Generates the comprehensive resource/FinOps inventory.
"""
"""Generates the comprehensive resource/FinOps inventory."""
filename = f"scheduling_summary.resources.{ts}.csv"
target_tags = ["PowerSchedule", "Environment", "Name", "finops_project_number", "finops_project_name"]
fields = ["account_id", "region", "type", "arn"] + target_tags

rows = []
for res in all_resources:
row = {
"account_id": res['account_id'],
"region": res['region'],
"type": res['type'],
"arn": res['arn']
}
for tag in target_tags:
row[tag] = get_tag(res.get('tags', {}), tag)
rows.append(row)

generate_csv(rows, fields, filename)
return filename
try:
with open(filename, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fields)
writer.writeheader()
for res in all_resources:
row = {
"account_id": res.get('account_id', 'N/A'),
"region": res.get('region', 'N/A'),
"type": res.get('type', 'N/A'),
"arn": res.get('arn', 'N/A')
}
for tag in target_tags:
row[tag] = get_tag(res.get('tags', {}), tag)
writer.writerow(row)
return filename
except Exception as e:
return f"Error: {str(e)}"

def main():
ts = datetime.now().strftime("%Y%m%dT%H%M%S")
all_res = []
# Search for all JSON audit results
for file_path in glob.glob("audit_results/*.json"):

# MATCHING THE SPECIFIC FILENAME PATTERN
audit_files = glob.glob("audit_results.check_scheduling.*.json")

if not audit_files:
print(f"❌ Error: No files matching 'audit_results.check_scheduling.*.json' found.")
return

print(f"Processing {len(audit_files)} audit file(s)...")

for file_path in audit_files:
with open(file_path, 'r') as f:
try:
data = json.load(f)
# Assuming audit structure: data['account_id'], data['resources'][]
all_res.extend(data.get('resources', []))
except: continue
raw_data = json.load(f)

# Navigate the org_runner JSON structure
# It usually looks like: {"account_id": {"data": {...}, "alias": "..."}}
for account_id, account_data in raw_data.items():
if isinstance(account_data, dict) and "data" in account_data:
# Extract the resources from the 'data' block
for res_id, res_info in account_data["data"].items():
if res_id == "account_summary": continue

# Ensure we carry the account_id into the resource object
res_info['account_id'] = account_id
all_res.append(res_info)
except Exception as e:
print(f" Skipping {file_path} due to error: {e}")

if not all_res:
print("No audit data found in audit_results/"); return
print("❌ Error: Files were found but no resource data was extracted. Check JSON structure."); return

# --- REPORT 1: FINOPS HEALTH ---
# --- FINOPS HEALTH REPORT ---
missing_num = sum(1 for r in all_res if not get_tag(r.get('tags', {}), 'finops_project_number'))
missing_name = sum(1 for r in all_res if not get_tag(r.get('tags', {}), 'finops_project_name'))

print("\n" + "=" * 80)
print("\n" + "=" * 85)
print(f"FINOPS TAGGING HEALTH (Total Resources: {len(all_res)})")
print("-" * 80)
print("-" * 85)
print(f" Missing 'finops_project_number': {missing_num:>5}")
print(f" Missing 'finops_project_name': {missing_name:>5}")
print(f" Current Tagging Compliance: {((len(all_res)-missing_num)/len(all_res))*100:.1f}%")

# --- REPORT 2: COMPLIANCE MATRIX ---
# --- SCHEDULING MATRIX ---
matrix = defaultdict(lambda: {"total": 0, "scheduled": 0})
for res in all_res:
env = get_tag(res.get('tags', {}), 'Environment', 'Unknown')
sched = get_tag(res.get('tags', {}), 'PowerSchedule')

matrix[env]["total"] += 1
if is_schedule_enabled(sched):
matrix[env]["scheduled"] += 1

print("\nSCHEDULING COMPLIANCE MATRIX")
print(f"{'Environment':<20} | {'Total':<8} | {'Scheduled':<12} | {'Compliance %'}")
print("-" * 80)
print(f"{'Environment':<25} | {'Total':<8} | {'Scheduled':<12} | {'Compliance %'}")
print("-" * 85)
for env, counts in sorted(matrix.items()):
pct = (counts['scheduled'] / counts['total']) * 100
print(f"{env:<20} | {counts['total']:<8} | {counts['scheduled']:<12} | {pct:.1f}%")
print(f"{env:<25} | {counts['total']:<8} | {counts['scheduled']:<12} | {pct:.1f}%")

# --- CSV GENERATION ---
print("\nGENERATING REPORTS...")
# 1. Master Resources
master = generate_master_resource_csv(all_res, ts)

# 2. Categorized CSVs (Examples shown for logic)
# Filter all_res into sub-lists (asg_ec2, eks_ec2, plain_ec2, rds) based on your logic
# generate_csv(plain_list, fields, f"scheduling_summary.plain_ec2.{ts}.csv")

print(f" [DONE] Created Master Inventory: {master}")
print(f" [DONE] Created Categorized Reports (4 files)")
print("=" * 80 + "\n")
master_file = generate_master_resource_csv(all_res, ts)
print(f" [DONE] Created Master Inventory: {master_file}")
print("=" * 85 + "\n")

if __name__ == "__main__":
main()

0 comments on commit ddb458d

Please sign in to comment.