OCI Policy Analysis Data and Logic
Policy Repository
- class oci_policy_analysis.logic.data_repo.PolicyAnalysisRepository[source]
Bases:
objectThis is the main data repository for Policy, Identity, and Compartment data
During initialization, the entire compartment hierarchy and policy tree is loaded into a central JSON dictionary. This central dictionary is then referenced by functions that filter and return a subset of information for display. Parsing, additional analysis, and import/export are made available by additional functions exposed.
Loading of data starts from load_policies_and_compartments, which loads all compartments and policies recursively
Filtering functions return lists of dataclass objects defined in models.py for easy consumption by UI or CLI layers.
See filter_policy_statements for an example of filtering and returning PolicyStatement objects.
- reset_state()[source]
Resets all main state variables (lists, dictionaries, flags, clients, IDs, etc.). Call this before any data (re)load operation for a clean repository state.
- initialize_client(use_instance_principal: bool, session_token: str | None = None, recursive: bool = True, profile: str = 'DEFAULT') bool[source]
Initializes the OCI client to be used for all data operations
Client can be loaded using PROFILE or Instance Principal authentication methods
- Parameters:
use_instance_principal – Whether to attempt Instance Principal signer-based authentication
recursive – Whether to load tenancy data across all compartments, or simply the root (tenancy) compartment
session – The named OCI Session Token Profile to use - must be present on the file system in the standard OCI location of .oci/config
profile – The named OCI Profile to use - must be present on the file system in the standard OCI location of .oci/config
- Returns:
A boolean indicating whether the client was created successfully. False indicates that an unrecoverable issue occurred setting up the client.
- check_statement_location_validity(st)[source]
Checks if the compartment location for a statement is valid (exists and is ACTIVE).
- Parameters:
st – The policy statement (dict).
- Returns:
None if valid; string message if invalid.
- load_policies_and_compartments() bool[source]
Optimized bulk loading of all compartments and all policies using OCI Clients.
Fetch compartments (hierarchy, flat)
Fetch policies (threaded fetch/parse) from OCI Resource Search
No queue or milestone progress emission
- load_complete_identity_domains(load_all_users: bool = True) bool[source]
Loads everything into the cetntral JSON
Identity Domains are loaded via the Identity Client. For each Identity Domain, load the Dynamic Groups, Groups, and Users
- Parameters:
load_all_users (bool) – If False, skip loading users. Default is True (backwards compatible).
- Returns:
A boolean indicating success of the data load. False indicates there was some failure in loading data, so it may be incomplete.
- filter_policy_statements(filters: PolicySearch) list[RegularPolicyStatement][source]
Filter policy statements by one or more criteria.
- Parameters:
filters (PolicySearch) – Dictionary of filter keys and their values (e.g. verb, resource, permission, group, etc).
- Returns:
List of statements matching the filter.
- Return type:
list[PolicyStatement]
- filter_cross_tenancy_policy_statements(alias_filter: list[str]) list[RegularPolicyStatement][source]
Filter cross-tenancy policy statements containing any provided alias.
- get_users_for_group(group: Group) list[User][source]
Return all users that belong to the specified exact group. Membership is determined by matching the group name and domain name.
- Parameters:
group (Group) – A dictionary with keys: - ‘domain’: str | None - ‘name’: str
- Returns:
A list of Users that belong to the specified group. If the group does not exist or has no members, returns an empty list.
- Return type:
list[User]
- get_groups_for_user(user: User) list[Group][source]
Return the list of all Groups that a user is a member of
- Parameters:
user (User) – The user to find groups for.
- Returns:
A list of Groups that the user is a member of.
- Return type:
list[Group]
- filter_groups(group_filter: GroupSearch) list[Group][source]
Filter groups based on the provided filter. Public function used by MCP or UI
- filter_users(user_filter: UserSearch) list[User][source]
Filter users based on the provided filter.
This function is used by the MCP interface and the UI.
- Parameters:
user_filter (UserSearch) –
A dictionary with optional keys.
domain_name(list[str]): Domain names to filter by (case-insensitive).search(list[str]): Search terms to match against usernames and display names (case-insensitive).user_ocid(list[str]): User OCIDs to filter by (case-insensitive).
- Returns:
Users that match the filter criteria.
Each
Useris represented as a dictionary with keys:domain_name(str | None): Domain name of the user.user_name(str): Username.user_ocid(str): OCID of the user.display_name(str): Display name of the user.email(str): Email of the user.user_id(str): Internal ID of the user.groups(list[str]): Group OCIDs the user belongs to.
- Return type:
list[User]
- filter_dynamic_groups(filters: DynamicGroupSearch) list[DynamicGroup][source]
Filter dynamic groups using JSON-based filters.
- Parameters:
filters (DynamicGroupSearch) –
A mapping of filter keys to one or more values.
OR: multiple values within a field act as logical OR.
AND: multiple fields are combined as logical AND.
Supported keys: *
domain_name→ matches “Domain” *dynamic_group_name→ matches “DG Name” *matching_rule→ matches “Matching Rule” *dynamic_group_ocid→ matches “DG OCID” *in_use→ matches “In Use” (True/False)- Returns:
A list of dynamic groups that satisfy the filters.
Each dynamic group is represented as a dictionary with keys: *
domain_name(str | None): The domain name of the dynamic group. *dynamic_group_name(str): The name of the dynamic group. *dynamic_group_id(str): The ID of the dynamic group. *dynamic_group_ocid(str): The OCID of the dynamic group. *matching_rule(str): The matching rule of the dynamic group. *description(str): The description of the dynamic group. *in_use(bool): Whether the dynamic group is in use. *creation_time(str): The creation timestamp of the dynamic group. *created_by_name(str): The name of the user who created the dynamic group. *created_by_ocid(str): The OCID of the user who created the dynamic group.- Return type:
list[DynamicGroup]
- Raises:
ValueError – If an unknown filter key is provided.
- load_from_compliance_output_dir(dir_path: str, load_all_users: bool = True) bool[source]
Load all compartments, domains, groups, users, dynamic groups, and policies from compliance tool output files.
Starts with domains, then dynamic groups, then users/groups/membership, then compartments, then policies. This function is for offline/compliance output analysis: no attempt to initialize any OCI client.
Reference Data Repository
GenAI Integration
- class oci_policy_analysis.logic.ai_repo.AI[source]
Bases:
objectAI Module for OCI Policy Analysis
Contains all of the available GenAI calls that can be made to obtain additional context.
- genai_client
The OCI GenAI Client.
- genai_inference_client
The OCI GenAI Inference Client
- initialize_client(use_instance_principal: bool, session_token: str | None = None, profile: str = 'DEFAULT') bool[source]
Initialize OCI GenAI Client with authentication. :param use_instance_principal: Whether to use Instance Principal authentication. :type use_instance_principal: bool :param session_token: Session token profile name for authentication. :type session_token: str | None :param profile: Profile name for authentication if not using instance principal or session token. :type profile: str
- Returns:
True if initialization is successful, False otherwise.
- Return type:
- update_config(model_ocid, endpoint, compartment_ocid)[source]
Update Model ID and Endpoint, reinitializing client if endpoint changes. :param model_ocid: The model OCID to use. :type model_ocid: str :param endpoint: The endpoint URL to use. :type endpoint: str :param compartment_ocid: The compartment OCID for requests. :type compartment_ocid: str
- list_models() list[dict][source]
List available models using GenerativeAiClient.list_models. :returns: A list of available GenAI models with their details. :rtype: list[dict]
- async analyze_policy_statement(policy_text: str, format: str = 'Markdown', queue: Queue | None = None, additional_instruction: str = '')[source]
Analyze a policy statement using the GenAI Inference Client.
- Parameters:
policy_text (str) – The policy statement text to analyze.
format (str) – The result format to use. “Markdown” means the GenAI should return markdown, “Text” means plain text using newlines for paragraphs and no bullet points, no markdown, no HTML.
queue (queue.Queue|None) – Optional queue to put the result into for async calls.
additional_instruction (str) – Additional instructions to include in the prompt.
- Returns:
The analysis result from the AI model.
- Return type:
Policy Intelligence & Analytics
- class oci_policy_analysis.logic.policy_intelligence.PolicyIntelligenceEngine(policy_repo)[source]
Bases:
objectProvides post-load intelligence, overlap analysis, and advanced policy insights on OCI policies.
This engine operates on a loaded PolicyAnalysisRepository and delivers advanced analytics such as risk scores, overlaps, recommendations, and policy hygiene findings.
- Parameters:
policy_repo (PolicyAnalysisRepository) – The repository containing loaded compartment, policy, and identity data.
- policy_repo
Source of OCI policy, identity, and compartment data.
- Type:
PolicyAnalysisRepository
- overlay
Stores the full results of all analytics (risk, overlaps, recommendations, etc).
- Type:
PolicyIntelligence
- permissions_report
Holds the effective permissions report used by various UI components.
- Type:
- build_permissions_report()[source]
Build a detailed nested report of effective permissions for all policy statements.
Scans all statements, resolves permissions and subjects, and groups by effective path and subject. Stores both the structured report and supporting lookup maps in self.permissions_report.
- Returns:
A structure with keys “report”, “resource_map”, “perm_conditionals”, and “perm_statements” containing all effective allow/deny permissions and metadata, or empty dicts if no data is present.
- Return type:
- run_dg_in_use_analysis()[source]
Analyzes Dynamic Group data for unused Dynamic Groups. Should be called after repo is loaded and statements parsed.
- analyze_policy_overlap()[source]
Analyze policy statements for potential overlaps, calling after all statements loaded/parsed. Stores result in self.overlay[“overlaps”] using the new PolicyIntelligence overlay structure.
- get_policy_overlaps_by_internal_id(internal_id: str)[source]
Returns all PolicyOverlap entries for a given policy statement internal ID by looking up the overlay model.
- find_invalid_statements()[source]
Mark regular policy statements as invalid if they fail various validity checks, such as: - Nonexistent Dynamic Groups or Groups - Invalid compartment OCIDs - Invalid verbs/resources
This method modifies the statements in-place, adding an invalid_reasons list if applicable.
- calculate_effective_compartment_for_statement(st)[source]
Calculate effective compartment OCID and path for a single statement, mutating st in place. Uses indexes built via build_compartment_index().
- calculate_all_effective_compartments()[source]
Resolve effective compartment for all statements. Loop through all statements and calculate.
- build_compartment_index()[source]
Build quick-lookup structures for resolving compartment names and parent/child relationships used by calculate_effective_compartment_for_statement().
- calculate_potential_risk_scores(where_clause_reduction_pct=50)[source]
Calculates potential risk scores for each policy statement based on permission risk values (from reference data) and multiplies by a compartment exposure factor based on scope in the compartment hierarchy. Applies a raw score reduction for statements with a WHERE clause (‘conditions’), adjustable as a percentage.
- Results are stored in self.overlay[“risk_scores”] as a list of dicts:
{ “statement_internal_id”, “score”, “notes”, “recommendations” }
- build_cleanup_items()[source]
Analyze policy repository and collect actionable cleanup items for all key risk categories. This should be called BEFORE build_overall_recommendations. The lists are attached to self.overlay[“cleanup_items”].
- build_overall_recommendations()[source]
Build the overall (user-facing) recommendations list for overlay[“recommendations”]. Each recommendation summarizes the count of existing actionable issues. Assumes build_cleanup_items has already been called and populated “cleanup_items”. If no real recommendations are found, yields one informational finding as a placeholder.
Example of recommendation dict:
{ 'Recommendation': 'Investigate invalid policy statements', 'Priority': 'High', 'Category': 'Policy Hygiene', 'Notes': '3 invalid policy statement(s) detected. Review the cleanup/fix tab for details.', 'Action': 'Plan: Review and remediate invalid policy statements', 'ActionDetail': 'Examine policies with invalid statements and resolve as appropriate.', }
This could change in the future to include risk score-based recommendations.
- build_policy_consolidation()[source]
Analyze policies/statements for possible consolidation opportunities and populate overlay[“consolidations”] with a list of dicts:
{ "Statement": ..., "Policy Name(s)": ..., "Compartment": ..., "Principal": ..., "Service/Resource": ..., "Consolidation Reason": ..., "Action": ..., # always present now "ActionDetail": ... # optional, for detail/planning dialog }
- Criteria:
Policies with only a single statement (likely consolidation candidate).
Statements with identical principal, compartment, and service/resource, but split across multiple differently-named policies (should suggest merge).
Simulation Engine
- class oci_policy_analysis.logic.simulation_engine.PolicySimulationEngine(policy_repo=None, ref_data_repo=None)[source]
Bases:
objectCentral simulation service for OCI policies.
— Stateless simulation setup APIs (recommended for UI/MCP) —
These methods provide the canonical way to derive principal keys, resolve all applicable policy statements, and compute required where-clause parameter names, regardless of UI vs MCP flow.
- Step 1: Normalize compartment_path and principal_type/principal into principal_key (handles tuple/string, None/’default’, etc).
-> Use normalize_principal_key(principal_type, principal)
- Step 2: Find all statements for a context (compartment_path, principal_type, principal).
-> Use find_applicable_statements_for_context(compartment_path, principal_type, principal)
- Step 3: Get all required where-clause variable names for a context.
-> Use get_required_where_fields_for_context(compartment_path, principal_type, principal)
The MCP version always uses all valid applicable statements for a context.
Responsibilities: - Given principal + effective compartment, generate the list of all applicable policy statements (including ‘any-user’). - Build and cache a mapping of compartment -> principal -> [statements] for fast lookup. - For a given policy context, extract all required where-clause fields, and evaluate conditional logic against input values. - For a set of user choices (principal + compartment + where values), return the effective permission set and permission simulation trace. - Fold in/replace all old where simulation logic found elsewhere (single source of truth). - DO NOT load or store raw data — consumption only.
- Primary entrypoints:
get_applicable_statements(principal, compartment): List[dict]
get_required_where_fields(statements): Set[str]
simulate_permissions(principal, compartment, where_context: dict): (Set[str], List[dict])
simulate_api_operation(principal, compartment, operation, where_context: dict): dict (YES/NO + full trace)
- get_statements_for_context(compartment_path, principal_type, principal=None)[source]
Canonical: Return (principal_key, [statement dicts]) for a given policy simulation context (UI/MCP).
- Parameters:
- Returns:
Tuple with normalized principal key and list of matching statements.
- Return type:
(principal_key, statements)
See docs/source/simulation_engine_usage_examples.py for usage.
- get_required_where_fields_for_context(compartment_path, principal_type, principal=None)[source]
Canonical: Given a context, return (principal_key, set of required where input variables).
- Parameters:
- Returns:
Tuple with normalized principal key and set of all required where fields.
- Return type:
(principal_key, required_fields)
See docs/source/simulation_engine_usage_examples.py for usage.
- static extract_variable_names(cond_str)[source]
Extract variable names from a policy condition string using the OCI IAM Policy Condition parser.
- simulate_and_record(principal_key: str, effective_path: str, api_operation: str, where_context: dict, checked_statement_ids: list[str] | None = None, trace_name: str | None = None, trace: bool = False) dict[str, Any][source]
Simulates permissions for a principal, compartment, and operation, recording full trace/results.
- Parameters:
principal_key (str) – The unique principal key (e.g., “group:default/Admins”) for the simulation.
effective_path (str) – The effective compartment path for simulation.
api_operation (str) – The API operation to check permission for.
where_context (dict) – Input/context for any evaluated ‘where’ policy clauses.
checked_statement_ids (Optional[list[str]]) – List of policy statement internal_ids to include in simulation. If omitted or None, all applicable statements for the context (principal_key and effective_path) are used.
trace_name (str, optional) – Optional name for this simulation trace/history.
trace (bool, optional) – If True, includes detailed step-by-step trace; else, summary only.
- Returns:
- Full simulation result with yes/no, permissions, missing/revoked, per-statement decision trace,
and trace object suitable for history review.
- Return type:
Example
result = engine.simulate_and_record(‘group:default/Admins’, ‘ROOT’, ‘oci:ListBuckets’, {}, checked_ids, trace=True) result = engine.simulate_and_record(‘group:default/Admins’, ‘ROOT’, ‘oci:ListBuckets’, {}, trace=True) # auto selects statements
- get_simulation_trace_list() list[dict[str, str]][source]
Get the list of names and timestamps for all recorded simulation traces.
- Returns:
- List of dictionaries with keys ‘name’ and ‘timestamp’
corresponding to each simulation history entry.
- Return type:
Example
[{‘name’: ‘Simulation 1’, ‘timestamp’: ‘2026-01-05T14:00:01’}, …]
- get_simulation_trace_by_index(idx: int) dict[str, Any] | None[source]
Retrieve a previously recorded simulation trace result by history index.
- Parameters:
idx (int) – Index into simulation history (0 = oldest).
- Returns:
The trace result dict, or None if index is out of range.
- Return type:
Example
>>> engine.get_simulation_trace_by_index(0) {'result': 'YES', ...}
- get_simulation_trace_by_name(name: str) dict[str, Any] | None[source]
Retrieve a simulation trace result by name label.
- get_api_operations(filter_text: str = '') list[source]
Get all known API operation names, optionally filtered by substring.
- Parameters:
filter_text (str, optional) – Substring to filter operation names (case-insensitive). Defaults to ‘’ (all ops).
- Returns:
Sorted list of API operation names.
- Return type:
Example
>>> engine.get_api_operations('bucket') ['oci:ListBuckets', 'oci:CreateBucket', ...]
- get_applicable_statements(principal_key: str, effective_path: str) list[dict][source]
Get all applicable policy statements for a principal and compartment by building a proper PolicySearch filter using the principal_key and effective_path.
- Parameters:
- Returns:
List of matching policy statement dicts.
- Return type:
List[dict]
Notes
Used by all canonical orchestration flows (UI/MCP)
See usage examples in docs/source/simulation_engine_usage_examples.py
- get_required_where_fields(statements: list[dict]) set[str][source]
Extract all unique “where clause” input field names needed by a set of policy statements.
- Parameters:
statements (list[dict]) – The statements to examine for conditional/where fields.
- Returns:
Set of all unique field names required as variables for condition evaluation.
- Return type:
Example
>>> fields = engine.get_required_where_fields(statements) >>> print(fields) # e.g. {'request.time', 'user.department'}