OCI Policy Analysis Data and Logic

Policy Repository

class oci_policy_analysis.logic.data_repo.PolicyAnalysisRepository[source]

Bases: object

This is the main data repository for Policy, Identity, and Compartment data

During initialization, the entire compartment hierarchy and policy tree is loaded into a central JSON dictionary. This central dictionary is then referenced by functions that filter and return a subset of information for display. Parsing, additional analysis, and import/export are made available by additional functions exposed.

Loading of data starts from load_policies_and_compartments, which loads all compartments and policies recursively

Filtering functions return lists of dataclass objects defined in models.py for easy consumption by UI or CLI layers.

See filter_policy_statements for an example of filtering and returning PolicyStatement objects.

reset_state()[source]

Resets all main state variables (lists, dictionaries, flags, clients, IDs, etc.). Call this before any data (re)load operation for a clean repository state.

initialize_client(use_instance_principal: bool, session_token: str | None = None, recursive: bool = True, profile: str = 'DEFAULT') bool[source]

Initializes the OCI client to be used for all data operations

Client can be loaded using PROFILE or Instance Principal authentication methods

Parameters:
  • use_instance_principal – Whether to attempt Instance Principal signer-based authentication

  • recursive – Whether to load tenancy data across all compartments, or simply the root (tenancy) compartment

  • session – The named OCI Session Token Profile to use - must be present on the file system in the standard OCI location of .oci/config

  • profile – The named OCI Profile to use - must be present on the file system in the standard OCI location of .oci/config

Returns:

A boolean indicating whether the client was created successfully. False indicates that an unrecoverable issue occurred setting up the client.

check_statement_location_validity(st)[source]

Checks if the compartment location for a statement is valid (exists and is ACTIVE).

Parameters:

st – The policy statement (dict).

Returns:

None if valid; string message if invalid.

load_policies_and_compartments() bool[source]

Optimized bulk loading of all compartments and all policies using OCI Clients.

  1. Fetch compartments (hierarchy, flat)

  2. Fetch policies (threaded fetch/parse) from OCI Resource Search

  3. No queue or milestone progress emission

load_complete_identity_domains(load_all_users: bool = True) bool[source]

Loads everything into the cetntral JSON

Identity Domains are loaded via the Identity Client. For each Identity Domain, load the Dynamic Groups, Groups, and Users

Parameters:

load_all_users (bool) – If False, skip loading users. Default is True (backwards compatible).

Returns:

A boolean indicating success of the data load. False indicates there was some failure in loading data, so it may be incomplete.

filter_policy_statements(filters: PolicySearch) list[RegularPolicyStatement][source]

Filter policy statements by one or more criteria.

Parameters:

filters (PolicySearch) – Dictionary of filter keys and their values (e.g. verb, resource, permission, group, etc).

Returns:

List of statements matching the filter.

Return type:

list[PolicyStatement]

filter_cross_tenancy_policy_statements(alias_filter: list[str]) list[RegularPolicyStatement][source]

Filter cross-tenancy policy statements containing any provided alias.

Parameters:

alias_filter (list[str]) – List of aliases to look for in statement text.

Returns:

Filtered cross-tenancy policy statements.

Return type:

list[PolicyStatement]

get_users_for_group(group: Group) list[User][source]

Return all users that belong to the specified exact group. Membership is determined by matching the group name and domain name.

Parameters:

group (Group) – A dictionary with keys: - ‘domain’: str | None - ‘name’: str

Returns:

A list of Users that belong to the specified group. If the group does not exist or has no members, returns an empty list.

Return type:

list[User]

get_groups_for_user(user: User) list[Group][source]

Return the list of all Groups that a user is a member of

Parameters:

user (User) – The user to find groups for.

Returns:

A list of Groups that the user is a member of.

Return type:

list[Group]

filter_groups(group_filter: GroupSearch) list[Group][source]

Filter groups based on the provided filter. Public function used by MCP or UI

filter_users(user_filter: UserSearch) list[User][source]

Filter users based on the provided filter.

This function is used by the MCP interface and the UI.

Parameters:

user_filter (UserSearch) –

A dictionary with optional keys.

  • domain_name (list[str]): Domain names to filter by (case-insensitive).

  • search (list[str]): Search terms to match against usernames and display names (case-insensitive).

  • user_ocid (list[str]): User OCIDs to filter by (case-insensitive).

Returns:

Users that match the filter criteria.

Each User is represented as a dictionary with keys:

  • domain_name (str | None): Domain name of the user.

  • user_name (str): Username.

  • user_ocid (str): OCID of the user.

  • display_name (str): Display name of the user.

  • email (str): Email of the user.

  • user_id (str): Internal ID of the user.

  • groups (list[str]): Group OCIDs the user belongs to.

Return type:

list[User]

filter_dynamic_groups(filters: DynamicGroupSearch) list[DynamicGroup][source]

Filter dynamic groups using JSON-based filters.

Parameters:

filters (DynamicGroupSearch) –

A mapping of filter keys to one or more values.

  • OR: multiple values within a field act as logical OR.

  • AND: multiple fields are combined as logical AND.

Supported keys: * domain_name → matches “Domain” * dynamic_group_name → matches “DG Name” * matching_rule → matches “Matching Rule” * dynamic_group_ocid → matches “DG OCID” * in_use → matches “In Use” (True/False)

Returns:

A list of dynamic groups that satisfy the filters.

Each dynamic group is represented as a dictionary with keys: * domain_name (str | None): The domain name of the dynamic group. * dynamic_group_name (str): The name of the dynamic group. * dynamic_group_id (str): The ID of the dynamic group. * dynamic_group_ocid (str): The OCID of the dynamic group. * matching_rule (str): The matching rule of the dynamic group. * description (str): The description of the dynamic group. * in_use (bool): Whether the dynamic group is in use. * creation_time (str): The creation timestamp of the dynamic group. * created_by_name (str): The name of the user who created the dynamic group. * created_by_ocid (str): The OCID of the user who created the dynamic group.

Return type:

list[DynamicGroup]

Raises:

ValueError – If an unknown filter key is provided.

load_from_compliance_output_dir(dir_path: str, load_all_users: bool = True) bool[source]

Load all compartments, domains, groups, users, dynamic groups, and policies from compliance tool output files.

Starts with domains, then dynamic groups, then users/groups/membership, then compartments, then policies. This function is for offline/compliance output analysis: no attempt to initialize any OCI client.

Parameters:
  • dir_path (str) – Path to a directory containing the expected compliance output files.

  • load_all_users (bool) – If False, skip loading users. Default is True.

Returns:

True if all files parsed and data loaded successfully, False otherwise.

Return type:

bool

Reference Data Repository

GenAI Integration

class oci_policy_analysis.logic.ai_repo.AI[source]

Bases: object

AI Module for OCI Policy Analysis

Contains all of the available GenAI calls that can be made to obtain additional context.

genai_client

The OCI GenAI Client.

genai_inference_client

The OCI GenAI Inference Client

initialize_client(use_instance_principal: bool, session_token: str | None = None, profile: str = 'DEFAULT') bool[source]

Initialize OCI GenAI Client with authentication. :param use_instance_principal: Whether to use Instance Principal authentication. :type use_instance_principal: bool :param session_token: Session token profile name for authentication. :type session_token: str | None :param profile: Profile name for authentication if not using instance principal or session token. :type profile: str

Returns:

True if initialization is successful, False otherwise.

Return type:

bool

update_config(model_ocid, endpoint, compartment_ocid)[source]

Update Model ID and Endpoint, reinitializing client if endpoint changes. :param model_ocid: The model OCID to use. :type model_ocid: str :param endpoint: The endpoint URL to use. :type endpoint: str :param compartment_ocid: The compartment OCID for requests. :type compartment_ocid: str

list_models() list[dict][source]

List available models using GenerativeAiClient.list_models. :returns: A list of available GenAI models with their details. :rtype: list[dict]

async analyze_policy_statement(policy_text: str, format: str = 'Markdown', queue: Queue | None = None, additional_instruction: str = '')[source]

Analyze a policy statement using the GenAI Inference Client.

Parameters:
  • policy_text (str) – The policy statement text to analyze.

  • format (str) – The result format to use. “Markdown” means the GenAI should return markdown, “Text” means plain text using newlines for paragraphs and no bullet points, no markdown, no HTML.

  • queue (queue.Queue|None) – Optional queue to put the result into for async calls.

  • additional_instruction (str) – Additional instructions to include in the prompt.

Returns:

The analysis result from the AI model.

Return type:

str

Policy Intelligence & Analytics

class oci_policy_analysis.logic.policy_intelligence.PolicyIntelligenceEngine(policy_repo)[source]

Bases: object

Provides post-load intelligence, overlap analysis, and advanced policy insights on OCI policies.

This engine operates on a loaded PolicyAnalysisRepository and delivers advanced analytics such as risk scores, overlaps, recommendations, and policy hygiene findings.

Parameters:

policy_repo (PolicyAnalysisRepository) – The repository containing loaded compartment, policy, and identity data.

policy_repo

Source of OCI policy, identity, and compartment data.

Type:

PolicyAnalysisRepository

overlay

Stores the full results of all analytics (risk, overlaps, recommendations, etc).

Type:

PolicyIntelligence

permissions_report

Holds the effective permissions report used by various UI components.

Type:

dict

build_permissions_report()[source]

Build a detailed nested report of effective permissions for all policy statements.

Scans all statements, resolves permissions and subjects, and groups by effective path and subject. Stores both the structured report and supporting lookup maps in self.permissions_report.

Returns:

A structure with keys “report”, “resource_map”, “perm_conditionals”, and “perm_statements” containing all effective allow/deny permissions and metadata, or empty dicts if no data is present.

Return type:

dict

run_dg_in_use_analysis()[source]

Analyzes Dynamic Group data for unused Dynamic Groups. Should be called after repo is loaded and statements parsed.

analyze_policy_overlap()[source]

Analyze policy statements for potential overlaps, calling after all statements loaded/parsed. Stores result in self.overlay[“overlaps”] using the new PolicyIntelligence overlay structure.

get_policy_overlaps_by_internal_id(internal_id: str)[source]

Returns all PolicyOverlap entries for a given policy statement internal ID by looking up the overlay model.

find_invalid_statements()[source]

Mark regular policy statements as invalid if they fail various validity checks, such as: - Nonexistent Dynamic Groups or Groups - Invalid compartment OCIDs - Invalid verbs/resources

This method modifies the statements in-place, adding an invalid_reasons list if applicable.

calculate_effective_compartment_for_statement(st)[source]

Calculate effective compartment OCID and path for a single statement, mutating st in place. Uses indexes built via build_compartment_index().

calculate_all_effective_compartments()[source]

Resolve effective compartment for all statements. Loop through all statements and calculate.

build_compartment_index()[source]

Build quick-lookup structures for resolving compartment names and parent/child relationships used by calculate_effective_compartment_for_statement().

calculate_potential_risk_scores(where_clause_reduction_pct=50)[source]

Calculates potential risk scores for each policy statement based on permission risk values (from reference data) and multiplies by a compartment exposure factor based on scope in the compartment hierarchy. Applies a raw score reduction for statements with a WHERE clause (‘conditions’), adjustable as a percentage.

Results are stored in self.overlay[“risk_scores”] as a list of dicts:

{ “statement_internal_id”, “score”, “notes”, “recommendations” }

build_cleanup_items()[source]

Analyze policy repository and collect actionable cleanup items for all key risk categories. This should be called BEFORE build_overall_recommendations. The lists are attached to self.overlay[“cleanup_items”].

build_overall_recommendations()[source]

Build the overall (user-facing) recommendations list for overlay[“recommendations”]. Each recommendation summarizes the count of existing actionable issues. Assumes build_cleanup_items has already been called and populated “cleanup_items”. If no real recommendations are found, yields one informational finding as a placeholder.

Example of recommendation dict:

{
    'Recommendation': 'Investigate invalid policy statements',
    'Priority': 'High',
    'Category': 'Policy Hygiene',
    'Notes': '3 invalid policy statement(s) detected. Review the cleanup/fix tab for details.',
    'Action': 'Plan: Review and remediate invalid policy statements',
    'ActionDetail': 'Examine policies with invalid statements and resolve as appropriate.',
}

This could change in the future to include risk score-based recommendations.

build_policy_consolidation()[source]

Analyze policies/statements for possible consolidation opportunities and populate overlay[“consolidations”] with a list of dicts:

{
    "Statement": ...,
    "Policy Name(s)": ...,
    "Compartment": ...,
    "Principal": ...,
    "Service/Resource": ...,
    "Consolidation Reason": ...,
    "Action": ...,         # always present now
    "ActionDetail": ...    # optional, for detail/planning dialog
}
Criteria:
  1. Policies with only a single statement (likely consolidation candidate).

  2. Statements with identical principal, compartment, and service/resource, but split across multiple differently-named policies (should suggest merge).

Simulation Engine

class oci_policy_analysis.logic.simulation_engine.PolicySimulationEngine(policy_repo=None, ref_data_repo=None)[source]

Bases: object

Central simulation service for OCI policies.

— Stateless simulation setup APIs (recommended for UI/MCP) —

These methods provide the canonical way to derive principal keys, resolve all applicable policy statements, and compute required where-clause parameter names, regardless of UI vs MCP flow.

Step 1: Normalize compartment_path and principal_type/principal into principal_key (handles tuple/string, None/’default’, etc).

-> Use normalize_principal_key(principal_type, principal)

Step 2: Find all statements for a context (compartment_path, principal_type, principal).

-> Use find_applicable_statements_for_context(compartment_path, principal_type, principal)

Step 3: Get all required where-clause variable names for a context.

-> Use get_required_where_fields_for_context(compartment_path, principal_type, principal)

The MCP version always uses all valid applicable statements for a context.

Responsibilities: - Given principal + effective compartment, generate the list of all applicable policy statements (including ‘any-user’). - Build and cache a mapping of compartment -> principal -> [statements] for fast lookup. - For a given policy context, extract all required where-clause fields, and evaluate conditional logic against input values. - For a set of user choices (principal + compartment + where values), return the effective permission set and permission simulation trace. - Fold in/replace all old where simulation logic found elsewhere (single source of truth). - DO NOT load or store raw data — consumption only.

Primary entrypoints:
  • get_applicable_statements(principal, compartment): List[dict]

  • get_required_where_fields(statements): Set[str]

  • simulate_permissions(principal, compartment, where_context: dict): (Set[str], List[dict])

  • simulate_api_operation(principal, compartment, operation, where_context: dict): dict (YES/NO + full trace)

get_statements_for_context(compartment_path, principal_type, principal=None)[source]

Canonical: Return (principal_key, [statement dicts]) for a given policy simulation context (UI/MCP).

Parameters:
  • compartment_path (str) – Path like ‘ROOT/Finance’

  • principal_type (str) – “user”, “group”, etc.

  • principal (optional) – None, string, or (domain, name)

Returns:

Tuple with normalized principal key and list of matching statements.

Return type:

(principal_key, statements)

See docs/source/simulation_engine_usage_examples.py for usage.

get_required_where_fields_for_context(compartment_path, principal_type, principal=None)[source]

Canonical: Given a context, return (principal_key, set of required where input variables).

Parameters:
  • compartment_path (str) – e.g. “ROOT/Finance”

  • principal_type (str) – “user”, “group”, etc.

  • principal (optional) – None, string, or (domain, name)

Returns:

Tuple with normalized principal key and set of all required where fields.

Return type:

(principal_key, required_fields)

See docs/source/simulation_engine_usage_examples.py for usage.

static extract_variable_names(cond_str)[source]

Extract variable names from a policy condition string using the OCI IAM Policy Condition parser.

Parameters:

cond_str (str) – The condition string to analyze.

Returns:

A set of variable names (as strings) present in the condition clause.

Return type:

set[str]

simulate_and_record(principal_key: str, effective_path: str, api_operation: str, where_context: dict, checked_statement_ids: list[str] | None = None, trace_name: str | None = None, trace: bool = False) dict[str, Any][source]

Simulates permissions for a principal, compartment, and operation, recording full trace/results.

Parameters:
  • principal_key (str) – The unique principal key (e.g., “group:default/Admins”) for the simulation.

  • effective_path (str) – The effective compartment path for simulation.

  • api_operation (str) – The API operation to check permission for.

  • where_context (dict) – Input/context for any evaluated ‘where’ policy clauses.

  • checked_statement_ids (Optional[list[str]]) – List of policy statement internal_ids to include in simulation. If omitted or None, all applicable statements for the context (principal_key and effective_path) are used.

  • trace_name (str, optional) – Optional name for this simulation trace/history.

  • trace (bool, optional) – If True, includes detailed step-by-step trace; else, summary only.

Returns:

Full simulation result with yes/no, permissions, missing/revoked, per-statement decision trace,

and trace object suitable for history review.

Return type:

dict[str, Any]

Example

result = engine.simulate_and_record(‘group:default/Admins’, ‘ROOT’, ‘oci:ListBuckets’, {}, checked_ids, trace=True) result = engine.simulate_and_record(‘group:default/Admins’, ‘ROOT’, ‘oci:ListBuckets’, {}, trace=True) # auto selects statements

get_simulation_trace_list() list[dict[str, str]][source]

Get the list of names and timestamps for all recorded simulation traces.

Returns:

List of dictionaries with keys ‘name’ and ‘timestamp’

corresponding to each simulation history entry.

Return type:

list[dict[str, str]]

Example

[{‘name’: ‘Simulation 1’, ‘timestamp’: ‘2026-01-05T14:00:01’}, …]

get_simulation_trace_by_index(idx: int) dict[str, Any] | None[source]

Retrieve a previously recorded simulation trace result by history index.

Parameters:

idx (int) – Index into simulation history (0 = oldest).

Returns:

The trace result dict, or None if index is out of range.

Return type:

dict[str, Any] or None

Example

>>> engine.get_simulation_trace_by_index(0)
{'result': 'YES', ...}
get_simulation_trace_by_name(name: str) dict[str, Any] | None[source]

Retrieve a simulation trace result by name label.

Parameters:

name (str) – Name of the simulation trace to look up.

Returns:

The simulation trace result, or None if no match.

Return type:

dict[str, Any] or None

get_api_operations(filter_text: str = '') list[source]

Get all known API operation names, optionally filtered by substring.

Parameters:

filter_text (str, optional) – Substring to filter operation names (case-insensitive). Defaults to ‘’ (all ops).

Returns:

Sorted list of API operation names.

Return type:

list[str]

Example

>>> engine.get_api_operations('bucket')
['oci:ListBuckets', 'oci:CreateBucket', ...]
get_applicable_statements(principal_key: str, effective_path: str) list[dict][source]

Get all applicable policy statements for a principal and compartment by building a proper PolicySearch filter using the principal_key and effective_path.

Parameters:
  • principal_key (str) – Canonical principal key (e.g., ‘user:Default/anita’)

  • effective_path (str) – Compartment path for simulation context (e.g., ‘ROOT/Finance’)

Returns:

List of matching policy statement dicts.

Return type:

List[dict]

Notes

  • Used by all canonical orchestration flows (UI/MCP)

  • See usage examples in docs/source/simulation_engine_usage_examples.py

get_required_where_fields(statements: list[dict]) set[str][source]

Extract all unique “where clause” input field names needed by a set of policy statements.

Parameters:

statements (list[dict]) – The statements to examine for conditional/where fields.

Returns:

Set of all unique field names required as variables for condition evaluation.

Return type:

set[str]

Example

>>> fields = engine.get_required_where_fields(statements)
>>> print(fields)  # e.g. {'request.time', 'user.department'}
generate_where_clauses_for_statements(statements: list[dict]) dict[source]

Utility: Generate a mapping from statement internal_id to set of where-variable names required (for UI preview purposes).

Parameters:

statements (list[dict]) – Statements to analyze.

Returns:

{internal_id: set(field names)} for statements that have where conditions.

Return type:

dict