Source code for oci_policy_analysis.logic.policy_intelligence

##########################################################################
# Copyright (c) 2024, Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#
# policy_intelligence.py
#
# Encapsulates after-load analysis, intelligence, and reporting logic for policies.
#
# @author: Andrew Gregory
#
# Supports Python 3.12 and above
#
# coding: utf-8
##########################################################################

import time

from oci_policy_analysis.common.logger import get_logger
from oci_policy_analysis.common.models import PolicyIntelligence, PolicyOverlap
from oci_policy_analysis.logic.reference_data_repo import ReferenceDataRepo

logger = get_logger(component='policy_intelligence')


[docs] class PolicyIntelligenceEngine: """ Provides post-load intelligence, overlap analysis, and advanced policy insights on OCI policies. This engine operates on a loaded PolicyAnalysisRepository and delivers advanced analytics such as risk scores, overlaps, recommendations, and policy hygiene findings. Args: policy_repo (PolicyAnalysisRepository): The repository containing loaded compartment, policy, and identity data. Attributes: policy_repo (PolicyAnalysisRepository): Source of OCI policy, identity, and compartment data. overlay (PolicyIntelligence): Stores the full results of all analytics (risk, overlaps, recommendations, etc). permissions_report (dict): Holds the effective permissions report used by various UI components. """ def __init__(self, policy_repo): """ Initialize the PolicyIntelligenceEngine and prepare analytics overlay structures. Args: policy_repo (PolicyAnalysisRepository): Repository with loaded compartment, policy, and identity data. """ self.policy_repo = policy_repo # Explicitly type and instantiate the overlay using the model self.overlay: PolicyIntelligence = PolicyIntelligence( overlaps=[], recommendations=[], risk_scores=[], consolidations=[] ) self.permissions_report = {} logger.info('Initialized PolicyIntelligenceEngine with repo.')
[docs] def build_permissions_report(self): # noqa: C901 """ Build a detailed nested report of effective permissions for all policy statements. Scans all statements, resolves permissions and subjects, and groups by effective path and subject. Stores both the structured report and supporting lookup maps in self.permissions_report. Returns: dict: A structure with keys "report", "resource_map", "perm_conditionals", and "perm_statements" containing all effective allow/deny permissions and metadata, or empty dicts if no data is present. """ logger.info('Building permissions report data (centralized)') permission_reference_repo = ReferenceDataRepo() report = {} resource_map = {} perm_conditionals = {} # (path,subject,perm) -> True/False perm_statements = {} # (path,subject,perm) -> statement_text statements = self.policy_repo.regular_statements for _idx, stmt in enumerate(statements): try: effective_path = stmt.get('effective_path') if not effective_path or effective_path is None: effective_path = 'UNKNOWN' action = stmt.get('action', 'allow').lower() subjects = stmt.get('subject', []) subject_type = stmt.get('subject_type', 'unknown') permissions = stmt.get('permission', []) resource_str = stmt.get('resource') statement_text = stmt.get('statement_text', '') is_conditional = bool(stmt.get('conditions')) if not permissions: resource = stmt.get('resource', '') verb = stmt.get('verb', '') if resource and verb: permissions = permission_reference_repo.get_permissions( entity=resource, verb=verb, action=action ) if not permissions: permissions = [f'{verb.upper()}_{resource.upper()}'] else: permissions = ['UNKNOWN_PERMISSION'] if subjects is None: subjects = [('Default', 'UNKNOWN')] for subject_domain, subject_name in subjects: domain_str = str(subject_domain) if subject_domain else 'Default' subject_key = f'{subject_type}:{domain_str}/{subject_name}' if effective_path not in report: report[effective_path] = {} if subject_key not in report[effective_path]: report[effective_path][subject_key] = {'allow': set(), 'deny': set()} if action == 'deny': report[effective_path][subject_key]['deny'].update(permissions) else: report[effective_path][subject_key]['allow'].update(permissions) for perm in permissions: perm_conditionals[(effective_path, subject_key, perm)] = is_conditional perm_statements[(effective_path, subject_key, perm)] = statement_text rsrc = resource_str if resource_str else ('Permissions (select row)' if permissions else '') resource_map[(effective_path, subject_key)] = rsrc except Exception: pass for path in report: for subject in report[path]: report[path][subject]['allow'] = sorted( [perm for perm in list(report[path][subject]['allow']) if perm is not None] ) report[path][subject]['deny'] = sorted( [perm for perm in list(report[path][subject]['deny']) if perm is not None] ) # Store both the report and auxiliary maps for selection/detail lookups: self.permissions_report = { 'report': report, 'resource_map': resource_map, 'perm_conditionals': perm_conditionals, 'perm_statements': perm_statements, } return self.permissions_report
[docs] def run_dg_in_use_analysis(self): """ Analyzes Dynamic Group data for unused Dynamic Groups. Should be called after repo is loaded and statements parsed. """ # Build a list of all subjects as list(tuple(domain,name)) all_subjects: list[tuple] = [] for st in self.policy_repo.regular_statements: subject_list = st.get('subject') or [] subject_type = st.get('subject_type') logger.debug(f'SubType: {subject_type} Subject: {subject_list}') if subject_type == 'dynamic-group': logger.debug(f'Add: {subject_type} Subject: {subject_list}') all_subjects.extend(subject_list) logger.debug(f'Subject Count for DG in-use analysis: {len(all_subjects)}') # Iterate all DGs, look at their Domain and Name, then look through each statement unused_dynamic_groups = 0 for dg in self.policy_repo.dynamic_groups: dg_domain = dg.get('domain_name') or 'default' dg_name = dg.get('dynamic_group_name') in_use = False # Will be true at end if it exists for subj_domain, subj_name in all_subjects: logger.debug(f'Compare {dg_domain} = {subj_domain} and {dg_name} = {subj_name}') if dg_domain.casefold() == subj_domain.casefold() and dg_name.casefold() == subj_name.casefold(): in_use = True break if not in_use: logger.info(f'Dynamic Group {dg_domain}/{dg_name} not in use') dg['in_use'] = False unused_dynamic_groups += 1 logger.info(f'Found {unused_dynamic_groups} unused dynamic groups')
[docs] def analyze_policy_overlap(self): # noqa: C901 """ Analyze policy statements for potential overlaps, calling after all statements loaded/parsed. Stores result in self.overlay["overlaps"] using the new PolicyIntelligence overlay structure. """ logger.info('Analyzing policy overlaps and building overlay structure') start_time = time.perf_counter() repo = self.policy_repo self.overlay['overlaps'] = [] # Reset on each analysis run overlaps_by_internal_id = {} for st in repo.regular_statements: effective_compartment = st.get('effective_path', '') or 'n/a' statement_text = st.get('statement_text', '') or 'n/a' policy_overlap = [] additional_notes = '' perm_overlap = [] reason = '' logger.debug( f'Analyzing statement "{statement_text}" in policy {st.get("policy_name")}' f' for overlaps - effective path: {effective_compartment}' ) for other_st in repo.regular_statements: if other_st.get('internal_id', 'N/A') == st.get('internal_id', 'N/A'): continue if other_st.get('subject_type') != st.get('subject_type'): continue if not other_st.get('effective_path'): continue if not effective_compartment.lower().startswith(other_st.get('effective_path', '').lower()): continue other_permissions = other_st.get('permission') or repo.permission_reference_repo.get_permissions( entity=other_st.get('resource', ''), verb=other_st.get('verb', ''), action=other_st.get('action', 'allow'), ) st_permissions = st.get('permission') or repo.permission_reference_repo.get_permissions( entity=st.get('resource', ''), verb=st.get('verb', ''), action=st.get('action', 'allow') ) logger.debug( f'Checking potential overlap(1) between "{st_permissions}" and "{other_permissions}" for statements "{st.get("policy_name")}:{statement_text}" and "{other_st.get("policy_name")}:{other_st.get("statement_text")}"' ) if not other_permissions and not st_permissions: logger.debug( f'Both statements have no permissions, treating as resource-only overlap check: {st.get("policy_name")}:{statement_text} / {other_st.get("policy_name")}:{other_st.get("statement_text")}' ) continue if not other_permissions or not st_permissions: if other_st.get('resource', '').lower() != st.get('resource', '').lower(): continue logger.debug( f'Potential Overlap (resource) based on resource name match: {st.get("policy_name")}:{statement_text} / {other_st.get("policy_name")}:{other_st.get("statement_text")}' ) reason = ( f'Exact match on resource name ({st.get("resource")}), subject_type, and at least one subject, ' f'with broader effective compartment in other policy ({other_st.get("effective_path")})' ) perm_overlap = ['Resource:' + st.get('resource', '')] else: perm_overlap = repo.permission_reference_repo.check_overlap(st_permissions, other_permissions) if len(perm_overlap) == 0: continue reason = ( f'Permission Overlap between resource {st.get("resource")} ' f'and {other_st.get("resource")}, subject_type, and at least one subject, ' f'with broader effective compartment in other policy ({other_st.get("effective_path")})' ) logger.debug( f'Permission Overlap (permission) Check between "{st_permissions}" and "{other_permissions}": {perm_overlap}' ) st_subjects = st.get('subject', []) other_subjects = other_st.get('subject', []) subject_overlap = False for st_subj in st_subjects: for other_subj in other_subjects: st0 = st_subj[0] st1 = st_subj[1] oth0 = other_subj[0] oth1 = other_subj[1] if isinstance(st0, list): st0 = '/'.join(map(str, st0)) if isinstance(st1, list): st1 = '/'.join(map(str, st1)) if isinstance(oth0, list): oth0 = '/'.join(map(str, oth0)) if isinstance(oth1, list): oth1 = '/'.join(map(str, oth1)) if (str(st0) or '').lower() == (str(oth0) or '').lower() and str(st1).lower() == str( oth1 ).lower(): subject_overlap = True break if subject_overlap: break if not subject_overlap: continue logger.debug( f'Potential Overlap: {st.get("policy_name")}:{statement_text} / {other_st.get("policy_name")}:{other_st.get("statement_text")}' ) confidence = 'high' if st.get('conditions') or other_st.get('conditions'): logger.debug( f'Policy Overlap detected with WHERE clause: Statement "{statement_text}" in policy {st.get("policy_name")}' f' is potentially superseded by statement {other_st.get("statement_text")} in policy {other_st.get("policy_name")}' ) confidence = 'medium' additional_notes = ( 'Runtime where clause(s) present in one or both statements may affect actual overlap.' ) policy_overlap.append( PolicyOverlap( superseded_by=other_st['policy_name'], confidence=confidence, reason=reason, statement_text=other_st['statement_text'], internal_id=other_st['internal_id'], permission_overlap=perm_overlap, additional_notes=additional_notes if 'additional_notes' in locals() else '', ) ) if len(policy_overlap) > 0: overlaps_by_internal_id.setdefault(st.get('internal_id'), policy_overlap) logger.debug( f'Policy Overlap(s) found for statement "{statement_text}" in policy {st.get("policy_name")}: {len(policy_overlap)}' ) # Rebuild overlay "overlaps" as list of {'statement_internal_id': ..., 'overlaps': [...] } self.overlay['overlaps'] = [ {'statement_internal_id': k, 'overlaps': v} for k, v in overlaps_by_internal_id.items() ] end_time = time.perf_counter() logger.info( f'Initialized policy_overlap overlay for {len(self.overlay["overlaps"])} statements in {end_time - start_time:.2f} seconds' )
[docs] def get_policy_overlaps_by_internal_id(self, internal_id: str): """ Returns all PolicyOverlap entries for a given policy statement internal ID by looking up the overlay model. """ for entry in self.overlay.get('overlaps', []): if entry.get('statement_internal_id') == internal_id: return entry.get('overlaps', []) return []
[docs] def find_invalid_statements(self): # noqa: C901 """ Mark regular policy statements as invalid if they fail various validity checks, such as: - Nonexistent Dynamic Groups or Groups - Invalid compartment OCIDs - Invalid verbs/resources This method modifies the statements in-place, adding an `invalid_reasons` list if applicable. """ repo = self.policy_repo for st in repo.regular_statements: logger.debug(f'Checking validity for statement: {st.get("statement_text")}') # If parsing errors have already populated invalid_reasons, preserve them invalid_reasons = list(st.get('invalid_reasons', [])) # Dynamic Group check if st.get('subject_type') == 'dynamic-group': for subject in st.get('subject', []): dg_domain = subject[0] or 'default' dg_name = subject[1] # See if this DG exists in our loaded DGs logger.debug(f'Checking DG existence for {dg_domain}/{dg_name}') dg_found = any( dg.get('dynamic_group_name', '').lower() == dg_name.lower() and dg.get('domain_name', 'default').lower() == dg_domain.lower() for dg in repo.dynamic_groups ) if not dg_found: st['valid'] = False invalid_reasons.append(f'Dynamic Group {dg_name} not found in tenancy') logger.debug(f'Dynamic Group {dg_name} not found for statement: {st.get("statement_text")}') # Group check elif st.get('subject_type') == 'group': for subject in st.get('subject', []): group_domain = subject[0] or 'default' group_name = subject[1] logger.debug(f'Checking Group existence for {group_domain}/{group_name}') group_found = any( g.get('group_name', '').lower() == group_name.lower() and g.get('domain_name', 'default').lower() == group_domain.lower() for g in repo.groups ) if not group_found: st['valid'] = False invalid_reasons.append(f'Group {group_name} not found in tenancy') logger.debug(f'Group {group_name} not found for statement: {st.get("statement_text")}') # Location check # location_invalid_reason = repo.check_statement_location_validity(st) # if location_invalid_reason: # st['valid'] = False # invalid_reasons.append(location_invalid_reason) # logger.debug(location_invalid_reason) # Verb check if st.get('verb') and st.get('verb', '').casefold() not in {'inspect', 'read', 'use', 'manage'}: logger.debug(f'Invalid Verb found: {st.get("verb")}') st['valid'] = False invalid_reasons.append(f'Invalid Verb ({st.get("verb")}) found') if len(invalid_reasons) > 0: st['invalid_reasons'] = invalid_reasons
[docs] def calculate_effective_compartment_for_statement(self, st): # noqa: C901 """ Calculate effective compartment OCID and path for a single statement, mutating st in place. Uses indexes built via build_compartment_index(). """ repo = self.policy_repo try: if ( not hasattr(self, 'compartments_by_id') or not hasattr(self, 'compartments_by_path') ) and repo.compartments: logger.info('Compartment indexes not found, building now for effective compartment calculation.') self.build_compartment_index() except Exception as idx_exc: st['effective_path'] = f'(Error building compartment indexes: {idx_exc})' return if not hasattr(self, 'compartments_by_id') or not hasattr(self, 'compartments_by_path'): st['effective_path'] = '(Compartments not loaded or indexes unavailable)' return logger.debug(f"-Statement: {st.get('statement_text')}") # Case 1 - in tenancy if st.get('location_type') == 'tenancy': st['effective_compartment_ocid'] = repo.tenancy_ocid st['effective_path'] = self._name_path_from_ocid(repo.tenancy_ocid) if st['effective_path']: st['effective_path'] = st['effective_path'].lower() logger.debug(f"Effective (ten) path for {st.get('statement_text')}: {st.get('effective_path')}") # Case 2 - Compartment ID elif st.get('location_type') == 'compartment id': st['effective_compartment_ocid'] = st.get('location') st['effective_path'] = self._name_path_from_ocid(st.get('location')) if st['effective_path']: st['effective_path'] = st['effective_path'].lower() st.setdefault('parsing_notes', []).append('Compartment ID used for location') logger.debug(f"Effective (id) path for {st.get('statement_text')}: {st.get('effective_path')}") # Case 3 - Compartment Name (with or without full path) else: logger.debug(f"Need to calc eff path for {st.get('statement_text')}") location = st.get('location') parts = [p.strip() for p in location.split(':') if p.strip()] if location else [] policy_path = self._name_path_from_ocid(st.get('compartment_ocid')) logger.debug(f'Policy Path: {policy_path} / Location parts: {parts}') eff_path = policy_path logger.debug(f'Initial effective path: {eff_path}') logger.debug(f"Compartment OCID for policy: {st.get('compartment_ocid')}") comp_name = self._comp_name_path_ocid(st.get('compartment_ocid')) logger.debug(f'Compartment name for compare: {comp_name}') if parts and parts[0].casefold() == (comp_name.casefold() if comp_name else ''): st.setdefault('parsing_notes', []).append('Deleted compartment from effective location') del parts[0] for p in parts: if eff_path is None: eff_path = '' eff_path += f'/{p}' if eff_path: eff_path = eff_path.lower() logger.debug(f"Effective (loc) path for {st.get('statement_text')}: {eff_path}") st['effective_path'] = eff_path st['effective_compartment_ocid'] = self.compartments_by_path.get(eff_path, {}).get('id')
[docs] def calculate_all_effective_compartments(self): """ Resolve effective compartment for all statements. Loop through all statements and calculate. """ for st in self.policy_repo.regular_statements: logger.debug(f'Calculating effective compartment for statement: {st.get("statement_text")}') self.calculate_effective_compartment_for_statement(st)
[docs] def build_compartment_index(self): """ Build quick-lookup structures for resolving compartment names and parent/child relationships used by calculate_effective_compartment_for_statement(). """ repo = self.policy_repo self.compartments_by_id = {} self.compartments_by_path = {} self.children_by_parent = {} logger.info( f'Building compartment indexes for effective compartment resolution. Compartments loaded: {len(repo.compartments)}' ) for comp in repo.compartments: cid = comp.get('id') name = comp.get('name') parent_id = comp.get('parent_id') or repo.tenancy_ocid path = comp.get('hierarchy_path') logger.debug(f'***Path is {path}') self.compartments_by_id[cid] = { 'name': name, 'path': path, 'parent_id': parent_id, } if path: self.compartments_by_path[path] = {'id': cid, 'name': name} self.children_by_parent.setdefault(parent_id, {})[name] = cid # TODO: Add these to debugger_tab as options logger.debug(f'Compartment by ID index: {self.compartments_by_id}') logger.debug(f'Compartment by Path index: {self.compartments_by_path}') logger.debug(f'Children by Parent index: {self.children_by_parent}') logger.info( f'Built compartment index: {len(self.compartments_by_id)} compartments, ' f'{len(self.children_by_parent)} parents with children.' )
def _name_path_from_ocid(self, ocid: str): logger.debug(f"Lookup details: {getattr(self, 'compartments_by_id', {})}") comp = getattr(self, 'compartments_by_id', {}).get(ocid) return comp.get('path') if comp else None def _comp_name_path_ocid(self, ocid: str): comp = getattr(self, 'compartments_by_id', {}).get(ocid) return comp.get('name') if comp else None def _check_invalid_location(self, compartment_ocid) -> bool: """ Returns False if the given compartment_ocid is not an active compartment (according to OCI). """ repo = self.policy_repo try: comp = repo.identity_client.get_compartment(compartment_id=compartment_ocid).data if comp.lifecycle_state == 'ACTIVE': return True else: logger.warning(f'Found Compartment but not ACTIVE: {compartment_ocid} was: {comp.lifecycle_state}') return False except Exception as e: logger.debug(f'Compartment OCID {compartment_ocid} not valid: {e}') return False
[docs] def calculate_potential_risk_scores(self, where_clause_reduction_pct=50): # noqa: C901 """ Calculates potential risk scores for each policy statement based on permission risk values (from reference data) and multiplies by a compartment exposure factor based on scope in the compartment hierarchy. Applies a raw score reduction for statements with a WHERE clause ('conditions'), adjustable as a percentage. Results are stored in self.overlay["risk_scores"] as a list of dicts: { "statement_internal_id", "score", "notes", "recommendations" } """ logger.info( 'Calculating potential risk scores for all policy statements with where_clause_reduction_pct=%s', where_clause_reduction_pct, ) repo = self.policy_repo ref_repo = repo.permission_reference_repo # Ensure compartment index is built if not hasattr(self, 'compartments_by_path') or not self.compartments_by_path: self.build_compartment_index() risk_scores = [] for st in repo.regular_statements: internal_id = st.get('internal_id') verb = st.get('verb', '').lower() resource = st.get('resource', '').lower() permissions = st.get('permission', []) effective_path = st.get('effective_path', '') or '' compartment_exposure = 1 # Compute permission risk sum if permissions: perm_risk_base = ref_repo.get_permissions_risk_sum(permissions, resource) risk_detail = f"Sum of permission risks ({', '.join(permissions)}): {perm_risk_base}" else: verb_risk_map = {'inspect': 1, 'read': 2, 'use': 10, 'manage': 50} perm_risk_base = ref_repo.get_verb_resource_risk(verb, resource) if perm_risk_base == 0: is_family = '-family' in resource if verb == 'inspect': base = 1 * (2 if is_family else 1) expl = f'inspect verb base ({base})' elif verb == 'read': base = 2 * (2 if is_family else 1) expl = f'read verb base ({base})' else: base = 2 if is_family else 1 expl = f'default base ({base})' risk_factor = verb_risk_map.get(verb, 1) perm_risk_base = base * risk_factor risk_detail = f'No permissions for verb/resource ({verb}, {resource}): rubric base {base}*verb_mult{risk_factor}={perm_risk_base} ({expl})' else: risk_detail = f'Verb/resource risk for ({verb}, {resource}): {perm_risk_base}' notes = [risk_detail] recommendations = [] # Compartment exposure: count all subcompartments-in-scope (including self) path_lower = effective_path.lower() exposure_count = 0 scope_label = '(path unknown)' if path_lower: for other_path in self.compartments_by_path or {}: if other_path and other_path.lower().startswith(path_lower): exposure_count += 1 if exposure_count == 0: exposure_count = 1 scope_label = f'Effective path: {effective_path}, Exposure compartments covered: {exposure_count}' else: exposure_count = 1 scope_label = 'Scope unknown: exposure x1' compartment_exposure = exposure_count notes.append(scope_label) total_risk = perm_risk_base * compartment_exposure notes.append(f'Final potential risk: {perm_risk_base} x {compartment_exposure} = {total_risk}') # WHERE clause reduction and recommendation has_where_clause = bool(st.get('conditions')) if has_where_clause: reduction_pct = ( where_clause_reduction_pct if isinstance(where_clause_reduction_pct, int | float) else 50 ) reduced_risk = int(total_risk * (1.0 - (reduction_pct / 100.0))) notes.append(f'WHERE clause present: raw risk reduced by {reduction_pct}% to {reduced_risk}.') total_risk = reduced_risk recommendations.append( 'Test and tighten where clause definition to reduce policy statement blast radius' ) # Generate other recommendations (least privilege, scope reduction, etc.) path_is_root = effective_path.lower() == 'root' is_family = resource in (ref_repo.data.get('families', {}).keys()) is_many_perms = (len(permissions) or 0) > 8 or perm_risk_base > 100 if verb == 'manage' and path_is_root: recommendations.append( 'High risk: Statement grants MANAGE at tenancy root. Recommend scoping to sub-compartment or to a specific resource where possible.' ) if is_family and verb in {'use', 'manage'} and compartment_exposure > 10: recommendations.append( "Consider using 'read' for entire families/resources and reserving 'use' or 'manage' for narrow/specific resources or compartments." ) if is_many_perms and compartment_exposure > 5: recommendations.append( "Permissions grant wide access. Consider reducing permission set and limiting the statement's compartment scope." ) if verb == 'manage': recommendations.append( "For most scenarios, prefer 'use' for practical operations and 'manage' only when administrative actions are justified." ) if not recommendations and total_risk > 100: recommendations.append('Review scope and permissions for potential risk reduction.') logger.debug( 'Risk score for statement %s: base=%s, exposure=%s, total=%s. Notes: %s Recommendations: %s', internal_id, perm_risk_base, compartment_exposure, total_risk, '; '.join(notes), recommendations, ) risk_scores.append( { 'statement_internal_id': internal_id, 'score': int(total_risk), 'notes': ' '.join(notes), 'recommendations': recommendations, } ) self.overlay['risk_scores'] = risk_scores logger.info(f'Calculated risk scores for {len(risk_scores)} statements.')
[docs] def build_cleanup_items(self): """ Analyze policy repository and collect actionable cleanup items for all key risk categories. This should be called BEFORE build_overall_recommendations. The lists are attached to self.overlay["cleanup_items"]. """ repo = self.policy_repo # (1) Invalid Policy Statements invalid_statements = [st for st in repo.regular_statements if st.get('invalid_reasons')] # (2) Unused Groups unused_groups = [group for group in repo.groups if not repo.get_users_for_group(group)] # (3) Unused Dynamic Groups self.run_dg_in_use_analysis() # ensure DG in_use fields are updated unused_dgs = repo.filter_dynamic_groups({'in_use': [False]}) # (4) Overly broad manage all-resources statements_too_open = [ st for st in repo.regular_statements if ( st.get('verb', '').lower() == 'manage' and st.get('resource', '').lower() == 'all-resources' and st.get('policy_name', '') != 'Tenant Admin Policy' ) ] # (5) Any-user without where anyuser_no_where = [ st for st in repo.regular_statements if st.get('subject_type') == 'any-user' and not st.get('conditions') ] self.overlay['cleanup_items'] = { 'invalid_statements': invalid_statements, 'unused_groups': unused_groups, 'unused_dynamic_groups': unused_dgs, 'statements_too_open': statements_too_open, 'anyuser_no_where': anyuser_no_where, }
[docs] def build_overall_recommendations(self): """ Build the overall (user-facing) recommendations list for overlay["recommendations"]. Each recommendation summarizes the count of existing actionable issues. Assumes build_cleanup_items has already been called and populated "cleanup_items". If no real recommendations are found, yields one informational finding as a placeholder. Example of recommendation dict:: { 'Recommendation': 'Investigate invalid policy statements', 'Priority': 'High', 'Category': 'Policy Hygiene', 'Notes': '3 invalid policy statement(s) detected. Review the cleanup/fix tab for details.', 'Action': 'Plan: Review and remediate invalid policy statements', 'ActionDetail': 'Examine policies with invalid statements and resolve as appropriate.', } This could change in the future to include risk score-based recommendations. """ cleanup = self.overlay.get('cleanup_items', {}) recommendations = [] if cleanup.get('invalid_statements'): recommendations.append( { 'Recommendation': 'Investigate invalid policy statements', 'Priority': 'High', 'Category': 'Policy Hygiene', 'Notes': f"{len(cleanup['invalid_statements'])} invalid policy statement(s) detected. Review the cleanup/fix tab for details.", 'Action': 'Plan: Review and remediate invalid policy statements', 'ActionDetail': 'Examine policies with invalid statements and resolve as appropriate.', } ) if cleanup.get('unused_groups'): recommendations.append( { 'Recommendation': 'Ensure all groups are needed; consider removing unused groups', 'Priority': 'Medium', 'Category': 'Identity Management', 'Notes': f"{len(cleanup['unused_groups'])} unused group(s) (0 members) detected. See cleanup/fix tab for actionable list.", 'Action': 'Plan: Remove or repurpose unused groups', 'ActionDetail': 'Review business need for empty groups and remove unless justified.', } ) if cleanup.get('unused_dynamic_groups'): recommendations.append( { 'Recommendation': 'Clean up unused Dynamic Groups', 'Priority': 'Medium', 'Category': 'Identity Management', 'Notes': f"{len(cleanup['unused_dynamic_groups'])} unused dynamic group(s) detected. See cleanup/fix tab for actionable list.", 'Action': 'Plan: Remove unused dynamic groups', 'ActionDetail': 'Delete or reassign dynamic groups not referenced in policy statements.', } ) if cleanup.get('statements_too_open'): recommendations.append( { 'Recommendation': 'Tighten policies that are too open (manage all-resources)', 'Priority': 'High', 'Category': 'Access Scope', 'Notes': f"{len(cleanup['statements_too_open'])} policy statement(s) granting 'manage all-resources' broadly detected. See cleanup/fix tab for details.", 'Action': "Plan: Restrict broad 'manage all-resources' statements", 'ActionDetail': 'Replace with least privilege and restrict to smallest viable compartment and subject.', } ) if cleanup.get('anyuser_no_where'): recommendations.append( { 'Recommendation': 'Always limit any-user statements with a concise where clause', 'Priority': 'High', 'Category': 'Access Scope', 'Notes': f"{len(cleanup['anyuser_no_where'])} policy statement(s) with 'any-user' subject and no where clause detected. See cleanup/fix tab for details.", 'Action': 'Plan: Add where clauses to any-user statements', 'ActionDetail': 'Enforce least privilege by specifying a concise where clause for all any-user policies.', } ) # Existing logic for critical recommendations (unchanged) repo = self.policy_repo for st in repo.regular_statements: policy_name = st.get('policy_name', '') verb = st.get('verb', '').lower() resource = st.get('resource', '').lower() if st.get('effective_path', '') is None: logger.info(f'Skipping statement with undefined effective_path (fix this): {st.get("statement_text")}') # Lets recommend fixing this first recommendations.append( { 'Recommendation': 'Statement has undefined effective path', 'Priority': 'High', 'Category': 'Compartment Resolution', 'Notes': f'Statement in policy {policy_name} has no effective path calculated. Ensure compartments are loaded and statement locations are valid.', 'Action': 'Plan: Review statement locations', 'ActionDetail': f"Check statement: '{st.get('statement_text','')}' in policy '{policy_name}' for location issues.", } ) continue eff_path = st.get('effective_path', '').lower() conditions = st.get('conditions') if ( policy_name != 'Tenant Admin Policy' and verb == 'manage' and resource == 'all-resources' and eff_path == 'root' and not conditions ): recommendations.append( { 'Recommendation': 'Review non-admin statement that grants manage all-resources at root', 'Priority': 'Critical', 'Category': 'Policy Scope', 'Notes': f'Policy {policy_name} grants manage all-resources at root with no conditions. Consider limiting scope or adding conditions.', 'Action': 'Plan: Restrict scope for manage all-resources', 'ActionDetail': f"Work with compartment admins to restrict '{policy_name}' or replace 'manage all-resources' with least privilege.", } ) # Guarantee at least one recommendation so UI never appears empty: if not recommendations: recommendations.append( { 'Recommendation': 'No critical recommendations detected.', 'Priority': 'Info', 'Category': 'General', 'Notes': 'No critical risks found in current policy set.', 'Action': 'No action needed', 'ActionDetail': 'No action is required at this time.', } ) self.overlay['recommendations'] = recommendations # Log summary critical_count = sum(1 for r in recommendations if r.get('Priority') == 'Critical') high_count = sum(1 for r in recommendations if r.get('Priority') == 'High') logger.info( f'Built overall recommendations: {len(recommendations)} total, {critical_count} critical, {high_count} high.' )
[docs] def build_policy_consolidation(self): """ Analyze policies/statements for possible consolidation opportunities and populate overlay["consolidations"] with a list of dicts:: { "Statement": ..., "Policy Name(s)": ..., "Compartment": ..., "Principal": ..., "Service/Resource": ..., "Consolidation Reason": ..., "Action": ..., # always present now "ActionDetail": ... # optional, for detail/planning dialog } Criteria: 1. Policies with only a single statement (likely consolidation candidate). 2. Statements with identical principal, compartment, and service/resource, but split across multiple differently-named policies (should suggest merge). """ repo = self.policy_repo consolidation_findings = [] # 1. Policies with only a single statement policy_statement_count = {} policy_statements = {} for st in repo.regular_statements: pol = st.get('policy_name', '') policy_statement_count.setdefault(pol, 0) policy_statement_count[pol] += 1 policy_statements.setdefault(pol, []).append(st) for pol, count in policy_statement_count.items(): if count == 1: st = policy_statements[pol][0] statement_text = st.get('statement_text', '') compartment = st.get('effective_path', '') principal = _principal_str(st) resource = st.get('resource', '') consolidation_findings.append( { 'Statement': statement_text, 'Policy Name(s)': pol, 'Compartment': compartment, 'Principal': principal, 'Service/Resource': resource, 'Consolidation Reason': f'Policy {pol} has only one statement; consider consolidation if other similar policies exist.', 'Action': f"Plan: Review and possibly merge '{pol}' into another policy with similar principal or scope.", 'ActionDetail': f"Review statement '{statement_text}' in policy '{pol}' for merge candidates.", } ) # 2. Same principal, compartment, and resource/service spread across policies combo_index = {} for st in repo.regular_statements: principal = _principal_str(st) compartment = st.get('effective_path', '') resource = st.get('resource', '') combo = (principal, compartment, resource) if not all(combo): continue combo_index.setdefault(combo, []) combo_index[combo].append(st) for combo, sts in combo_index.items(): # If applies to more than one unique policy policy_names = {st.get('policy_name', '') for st in sts} if len(sts) > 1 and len(policy_names) > 1: statement_texts = '; '.join([st.get('statement_text', '') for st in sts]) consolidation_findings.append( { 'Statement': statement_texts, 'Policy Name(s)': ', '.join(sorted(policy_names)), 'Compartment': combo[1], 'Principal': combo[0], 'Service/Resource': combo[2], 'Consolidation Reason': 'Multiple policies found with same principal, resource/service, and compartment; recommend merge for clarity.', 'Action': f"Plan: Consolidate policies {', '.join(sorted(policy_names))} into one.", 'ActionDetail': f'Statements: {statement_texts}.\nEvaluate details and propose a single policy.', } ) # Always append a sample/demo row at the end for display testing. consolidation_findings.append( { 'Statement': '[Sample] Consolidation not yet implemented: demo stub row', 'Policy Name(s)': '[demo]', 'Compartment': '[sample]', 'Principal': '[sample]', 'Service/Resource': '[sample]', 'Consolidation Reason': 'Demo: Consolidation engine stubbed/not implemented yet.', 'Action': 'Not yet implemented', 'ActionDetail': 'Policy consolidation is not implemented in this version. This is a stub/demo entry for UI and engine plumbing.', } ) self.overlay['consolidations'] = consolidation_findings
def _principal_str(st): typ = st.get('subject_type') or '' subs = st.get('subject') or [] # Expect list of tuples; join for display if not subs: return typ sub_list = [] for sub in subs: # tuple or list of two: (domain, name) if isinstance(sub, tuple | list): domain, name = sub if len(sub) == 2 else ('', '') if domain and domain != 'default': sub_list.append(f'{typ}:{domain}/{name}') else: sub_list.append(f'{typ}:{name}') elif isinstance(sub, str): sub_list.append(f'{typ}:{sub}') else: sub_list.append(f'{typ}:{str(sub)}') return ', '.join(sub_list)