OCI Policy Analysis Data and Logic
Policy Repository
- class oci_policy_analysis.logic.data_repo.PolicyAnalysisRepository[source]
Bases:
objectThis is the main data repository for Policy, Identity, and Compartment data
During initialization, the entire compartment hierarchy and policy tree is loaded into a central JSON dictionary. This central dictionary is then referenced by functions that filter and return a subset of information for display. Parsing, additional analysis, and import/export are made available by additional functions exposed.
Loading of data starts from load_policies_and_compartments, which loads all compartments and policies recursively
Filtering functions return lists of dataclass objects defined in models.py for easy consumption by UI or CLI layers.
See filter_policy_statements for an example of filtering and returning PolicyStatement objects.
- reset_state()[source]
Resets all main state variables (lists, dictionaries, flags, clients, IDs, etc.). Call this before any data (re)load operation for a clean repository state.
- initialize_client(use_instance_principal: bool, session_token: str | None = None, recursive: bool = True, profile: str = 'DEFAULT') bool[source]
Initializes the OCI client to be used for all data operations
Client can be loaded using PROFILE or Instance Principal authentication methods
- Parameters:
use_instance_principal – Whether to attempt Instance Principal signer-based authentication
recursive – Whether to load tenancy data across all compartments, or simply the root (tenancy) compartment
session – The named OCI Session Token Profile to use - must be present on the file system in the standard OCI location of .oci/config
profile – The named OCI Profile to use - must be present on the file system in the standard OCI location of .oci/config
- Returns:
A boolean indicating whether the client was created successfully. False indicates that an unrecoverable issue occurred setting up the client.
- check_statement_location_validity(st)[source]
Checks if the compartment location for a statement is valid (exists and is ACTIVE).
- Parameters:
st – The policy statement (dict).
- Returns:
None if valid; string message if invalid.
- load_policies_only() bool[source]
Loads policies/statements only, assuming compartments are already loaded.
- load_policies_and_compartments() bool[source]
Loads both compartments and all policies using OCI Clients. (Convenience function)
- reload_compartment_policy_data() bool[source]
Reload just the policy/compartment/statement data (not IAM), and update the in-memory timestamp. (No cache operations here—see main.py/App for cache update and UI triggers.)
- Returns:
True if the reload succeeded, False otherwise.
- Return type:
- fetch_tenancy_policy_statement_limits()[source]
Fetch two key limits from OCI Limits service (“Identity”): - policies-count (max policies in tenancy) - statements-count (max statements per policy) Uses _api_call_with_logging to time/log the call. Returns a tuple: (policies_count_limit, statements_per_policy_limit) or (None, None) if unavailable or error.
- load_complete_identity_domains(load_all_users: bool = True, compartment_domain_search_depth: int = 1) bool[source]
Loads users, groups, dynamic groups, and domains for all compartments up to the given depth below the root compartment. No longer uses explicit domain_compartment_ocids.
- filter_policy_statements(filters: PolicySearch) list[RegularPolicyStatement][source]
Filter policy statements by one or more criteria.
- Parameters:
filters (PolicySearch) – Dictionary of filter keys and their values (e.g. verb, resource, permission, group, etc).
- Returns:
List of statements matching the filter.
- Return type:
list[PolicyStatement]
- filter_cross_tenancy_policy_statements(alias_filter: list[str]) list[RegularPolicyStatement][source]
Filter cross-tenancy policy statements containing any provided alias.
- get_users_for_group(group: Group) list[User][source]
Return all users that belong to the specified exact group. Membership is determined by matching the group name and domain name.
- Parameters:
group (Group) – A dictionary with keys: - ‘domain’: str | None - ‘name’: str
- Returns:
A list of Users that belong to the specified group. If the group does not exist or has no members, returns an empty list.
- Return type:
list[User]
- get_groups_for_user(user: User) list[Group][source]
Return the list of all Groups that a user is a member of
- Parameters:
user (User) – The user to find groups for.
- Returns:
A list of Groups that the user is a member of.
- Return type:
list[Group]
- filter_groups(group_filter: GroupSearch) list[Group][source]
Filter groups based on the provided filter. Public function used by MCP or UI
- filter_users(user_filter: UserSearch) list[User][source]
Filter users based on the provided filter.
This function is used by the MCP interface and the UI.
- Parameters:
user_filter (UserSearch) –
A dictionary with optional keys.
domain_name(list[str]): Domain names to filter by (case-insensitive).search(list[str]): Search terms to match against usernames and display names (case-insensitive).user_ocid(list[str]): User OCIDs to filter by (case-insensitive).
- Returns:
Users that match the filter criteria.
Each
Useris represented as a dictionary with keys:domain_name(str | None): Domain name of the user.user_name(str): Username.user_ocid(str): OCID of the user.display_name(str): Display name of the user.email(str): Email of the user.user_id(str): Internal ID of the user.groups(list[str]): Group OCIDs the user belongs to.
- Return type:
list[User]
- filter_dynamic_groups(filters: DynamicGroupSearch) list[DynamicGroup][source]
Filter dynamic groups using JSON-based filters.
- Parameters:
filters (DynamicGroupSearch) –
A mapping of filter keys to one or more values.
OR: multiple values within a field act as logical OR.
AND: multiple fields are combined as logical AND.
Supported keys: *
domain_name→ matches “Domain” *dynamic_group_name→ matches “DG Name” *matching_rule→ matches “Matching Rule” *dynamic_group_ocid→ matches “DG OCID” *in_use→ matches “In Use” (True/False)- Returns:
A list of dynamic groups that satisfy the filters.
Each dynamic group is represented as a dictionary with keys: *
domain_name(str | None): The domain name of the dynamic group. *dynamic_group_name(str): The name of the dynamic group. *dynamic_group_id(str): The ID of the dynamic group. *dynamic_group_ocid(str): The OCID of the dynamic group. *matching_rule(str): The matching rule of the dynamic group. *description(str): The description of the dynamic group. *in_use(bool): Whether the dynamic group is in use. *creation_time(str): The creation timestamp of the dynamic group. *created_by_name(str): The name of the user who created the dynamic group. *created_by_ocid(str): The OCID of the user who created the dynamic group.- Return type:
list[DynamicGroup]
- Raises:
ValueError – If an unknown filter key is provided.
- load_from_compliance_output_dir(dir_path: str, load_all_users: bool = True) bool[source]
Load all compartments, domains, groups, users, dynamic groups, and policies from compliance tool output files.
Always resets the reload time (policy_data_reloaded) so that reload is not shown for compliance/CSV data.
Starts with domains, then dynamic groups, then users/groups/membership, then compartments, then policies. This function is for offline/compliance output analysis: no attempt to initialize any OCI client.
Reference Data Repository
- class oci_policy_analysis.logic.reference_data_repo.ReferenceDataRepo(json_dir='permissions')[source]
Bases:
objectRepository for reference data on resources, families, and permissions. Loads from JSON files in a specified directory.
JSON Structure Example: {
- “resources”: {
- “resource_name”: {
- “verbs”: {
“inspect”: [“permission1”, “permission2”], “read”: [“permission3”], “use”: [“permission4”], “manage”: [“permission5”]
}
}, “families”: {
- “family_name”: {
“resources”: [“resource_name1”, “resource_name2”], “source_url”: “http://example.com/source”
}
}
Once all files are loaded, provides methods to query permissions and check overlaps.
- load_data()[source]
- get_permission_risk(permission: str, resource: str = None)[source]
Get the risk score for a single permission string (optionally for a given resource). If resource is not provided, search all resources.
This method is case-insensitive for permission and resource.
- get_permissions_risk_sum(permissions, resource: str = None)[source]
Given a list of permissions (case-insensitive), compute the summed risk score.
- get_verb_resource_risk(verb: str, resource: str)[source]
Get cumulative permission risk for all permissions associated with given verb/resource.
- get_permissions(entity, verb, action='allow')[source]
Get cumulative permissions for a resource or family at a given verb level and action. For “allow”: behavior is as before. For “deny”: logic is inverted – broader verbs (like ‘inspect’) deny more permissions.
Special case: if entity == ‘all-resources’, gather permissions from all resources/types, but for the specified verb only. For ‘allow’, union the specific-verb permissions from all. For ‘deny’, union the same but these are what is DENIED.
- check_overlap(perm_set1, perm_set2)[source]
Check for overlapping permissions between two permission sets. Uses 2 lists of permissions. Always compares and returns upper case permissions (display, logic, and reporting).
- check_overlap_params(entity1, verb1, action1, entity2, verb2, action2)[source]
Check overlapped permissions by specifying both sides as entity/verb/action.
- get_source(entity)[source]
Retrieve the source URL(s) for a given entity (resource or family) in a case-insensitive manner.
- has_api_operation_permissions(operation_name, granted_permissions)[source]
Check if all required permissions for the given API operation are present in the granted_permissions list. :param operation_name: Name of the API operation (as in ‘operations’ node). :type operation_name: str :param granted_permissions: List of permission strings to check. :type granted_permissions: list[str]
- Returns:
True if all required permissions for the operation are present, False otherwise.
- Return type:
GenAI Integration
- class oci_policy_analysis.logic.ai_repo.AI[source]
Bases:
objectAI Module for OCI Policy Analysis
Contains all of the available GenAI calls that can be made to obtain additional context.
- genai_client
The OCI GenAI Client.
- genai_inference_client
The OCI GenAI Inference Client
- initialize_client(use_instance_principal: bool, session_token: str | None = None, profile: str = 'DEFAULT') bool[source]
Initialize OCI GenAI Client with authentication. :param use_instance_principal: Whether to use Instance Principal authentication. :type use_instance_principal: bool :param session_token: Session token profile name for authentication. :type session_token: str | None :param profile: Profile name for authentication if not using instance principal or session token. :type profile: str
- Returns:
True if initialization is successful, False otherwise.
- Return type:
- update_config(model_ocid, endpoint, compartment_ocid)[source]
Update Model ID and Endpoint, reinitializing client if endpoint changes. :param model_ocid: The model OCID to use. :type model_ocid: str :param endpoint: The endpoint URL to use. :type endpoint: str :param compartment_ocid: The compartment OCID for requests. :type compartment_ocid: str
- list_models() list[dict][source]
List available models using GenerativeAiClient.list_models. :returns: A list of available GenAI models with their details. :rtype: list[dict]
- async analyze_policy_statement(policy_text: str, format: str = 'Markdown', queue: Queue | None = None, additional_instruction: str = '')[source]
Analyze a policy statement using the GenAI Inference Client.
- Parameters:
policy_text (str) – The policy statement text to analyze.
format (str) – The result format to use. “Markdown” means the GenAI should return markdown, “Text” means plain text using newlines for paragraphs and no bullet points, no markdown, no HTML.
queue (queue.Queue|None) – Optional queue to put the result into for async calls.
additional_instruction (str) – Additional instructions to include in the prompt.
- Returns:
The analysis result from the AI model.
- Return type:
Policy Intelligence & Analytics
- class oci_policy_analysis.logic.policy_intelligence.PolicyIntelligenceEngine(policy_repo, strategies: list[IntelligenceStrategy] | None = None)[source]
Bases:
objectProvides post-load intelligence, overlap analysis, and advanced policy insights on OCI policies.
This engine operates on a loaded PolicyAnalysisRepository and delivers advanced analytics such as risk scores, overlaps, recommendations, and policy hygiene findings.
- Parameters:
policy_repo (PolicyAnalysisRepository) – The repository containing loaded compartment, policy, and identity data.
- policy_repo
Source of OCI policy, identity, and compartment data.
- Type:
PolicyAnalysisRepository
- overlay
Stores the full results of all analytics (risk, overlaps, recommendations, etc).
- Type:
PolicyIntelligence
- permissions_report
Holds the effective permissions report used by various UI components.
- Type:
- register_strategy(strategy: IntelligenceStrategy) None[source]
Register a single intelligence strategy (pluggable).
- get_strategy_ids() list[str][source]
Return strategy_ids in run order (for Settings and run_all filtering).
- get_strategies_for_settings() list[tuple[str, str, str]][source]
Return (strategy_id, display_name, category) for all registered strategies, in run order.
- run_all(enabled_strategy_ids: list[str] | None = None, params: dict | None = None) None[source]
Run all enabled intelligence strategies in order and merge results into overlay.
Ensures prerequisites (compartment index, invalid statements, DG in-use) are run first. If no strategies are registered, falls back to legacy method calls for backward compatibility.
- Parameters:
enabled_strategy_ids – If None, run all registered strategies (or legacy path). Otherwise run only these.
params – Optional dict (e.g. where_clause_reduction_pct, service_principal_reduction_pct, enabled_cleanup_check_ids, consolidation_strategy_names). Passed to each strategy; engine ref added.
- build_permissions_report()[source]
Build a detailed nested report of effective permissions for all policy statements.
Scans all statements, resolves permissions and subjects, and groups by effective path and subject. Stores both the structured report and supporting lookup maps in self.permissions_report.
- Returns:
A structure with keys “report”, “resource_map”, “perm_conditionals”, and “perm_statements” containing all effective allow/deny permissions and metadata, or empty dicts if no data is present.
- Return type:
- run_dg_in_use_analysis()[source]
Analyzes Dynamic Group data for unused Dynamic Groups. Should be called after repo is loaded and statements parsed.
- analyze_policy_overlap()[source]
Analyze policy statements for potential overlaps, calling after all statements loaded/parsed. Stores result in self.overlay[“overlaps”] using the new PolicyIntelligence overlay structure.
- get_policy_overlaps_by_internal_id(internal_id: str)[source]
Returns all PolicyOverlap entries for a given policy statement internal ID by looking up the overlay model.
- find_invalid_statements()[source]
Mark regular policy statements as invalid if they fail various validity checks, such as: - Nonexistent Dynamic Groups or Groups - Invalid compartment OCIDs - Invalid verbs/resources
This method modifies the statements in-place, adding an invalid_reasons list if applicable.
- calculate_effective_compartment_for_statement(st)[source]
Calculate effective compartment OCID and path for a single statement, mutating st in place. Uses indexes built via build_compartment_index().
- calculate_all_effective_compartments()[source]
Resolve effective compartment for all statements. Loop through all statements and calculate.
- build_compartment_index()[source]
Build quick-lookup structures for resolving compartment names and parent/child relationships used by calculate_effective_compartment_for_statement().
- calculate_potential_risk_scores(where_clause_reduction_pct=50, service_principal_reduction_pct=50)[source]
Calculates potential risk scores for each policy statement.
Formula: risk = exposure_points × compartments_in_scope (then optional reductions for WHERE clause or service principal). Exposure points = sum of each permission’s risk by verb level from the reference data (inspect=1, read=5, use=20, manage=50). When the statement has a permission list, each permission is looked up and its verb-level risk is summed. When it has no permission list, the reference repo returns the sum of all permissions for that verb/resource (each weighted by its verb). Compartments in scope = number of compartments at or below the statement’s effective path in the hierarchy (e.g. “in tenancy” with 12 compartments => multiplier 12; a statement effective in a sub-compartment has a smaller multiplier).
Applies a raw score reduction for statements with a WHERE clause, and a further reduction for service-principal statements with use/manage verbs (adjustable via service_principal_reduction_pct).
- Results are stored in self.overlay[“risk_scores”] as a list of dicts:
{ “statement_internal_id”, “score”, “notes”, “recommendations” }
- build_cleanup_items(enabled_check_ids=None)[source]
Analyze policy repository and collect actionable cleanup items for all key risk categories. This should be called BEFORE build_overall_recommendations.
- Parameters:
enabled_check_ids – If None, all checks run. Otherwise only run checks whose ID is in this list (use CLEANUP_CHECK_IDS). Disabled keys are set to empty list in overlay.
The lists are attached to self.overlay[“cleanup_items”].
- build_overall_recommendations(params: dict | None = None)[source]
Build the overall (user-facing) recommendations list for overlay[“recommendations”]. Each recommendation summarizes the count of existing actionable issues. Assumes build_cleanup_items has already been called and populated “cleanup_items”. If no real recommendations are found, yields one informational finding as a placeholder. params may include consolidation_strategy_names (list of display names) for workbench cross-link.
Example of recommendation dict:
{ 'Recommendation': 'Investigate invalid policy statements', 'Priority': 'High', 'Category': 'Policy Hygiene', 'Notes': '3 invalid policy statement(s) detected. Review the cleanup/fix tab for details.', 'Action': 'Plan: Review and remediate invalid policy statements', 'ActionDetail': 'Examine policies with invalid statements and resolve as appropriate.', }
This could change in the future to include risk score-based recommendations.
- build_policy_consolidation()[source]
Analyze policies/statements for possible consolidation opportunities and populate overlay[“consolidations”] with a list of dicts:
{ "Statement": ..., "Policy Name(s)": ..., "Compartment": ..., "Principal": ..., "Service/Resource": ..., "Consolidation Reason": ..., "Action": ..., # always present now "ActionDetail": ... # optional, for detail/planning dialog }
- Criteria:
Policies with only a single statement (likely consolidation candidate).
Statements with identical principal, compartment, and service/resource, but split across multiple differently-named policies (should suggest merge).
Simulation Engine
- class oci_policy_analysis.logic.simulation_engine.PolicySimulationEngine(policy_repo=None, ref_data_repo=None)[source]
Bases:
objectCentral simulation service for OCI policies.
— Stateless simulation setup APIs (recommended for UI/MCP) —
These methods provide the canonical way to derive principal keys, resolve all applicable policy statements, and compute required where-clause parameter names, regardless of UI vs MCP flow.
- Step 1: Normalize compartment_path and principal_type/principal into principal_key (handles tuple/string, None/’default’, etc).
-> Use normalize_principal_key(principal_type, principal)
- Step 2: Find all statements for a context (compartment_path, principal_type, principal).
-> Use find_applicable_statements_for_context(compartment_path, principal_type, principal)
- Step 3: Get all required where-clause variable names for a context.
-> Use get_required_where_fields_for_context(compartment_path, principal_type, principal)
The MCP version always uses all valid applicable statements for a context.
Responsibilities: - Given principal + effective compartment, generate the list of all applicable policy statements (including ‘any-user’). - Build and cache a mapping of compartment -> principal -> [statements] for fast lookup. - For a given policy context, extract all required where-clause fields, and evaluate conditional logic against input values. - For a set of user choices (principal + compartment + where values), return the effective permission set and permission simulation trace. - Fold in/replace all old where simulation logic found elsewhere (single source of truth). - DO NOT load or store raw data — consumption only.
- Primary entrypoints:
get_applicable_statements(principal, compartment): List[dict]
get_required_where_fields(statements): Set[str]
simulate_permissions(principal, compartment, where_context: dict): (Set[str], List[dict])
simulate_api_operation(principal, compartment, operation, where_context: dict): dict (YES/NO + full trace)
- get_statements_for_context(compartment_path, principal_type, principal=None)[source]
Canonical: Return (principal_key, [statement dicts]) for a given policy simulation context (UI/MCP).
- Parameters:
- Returns:
Tuple with normalized principal key and list of matching statements.
- Return type:
(principal_key, statements)
See docs/source/simulation_engine_usage_examples.py for usage.
- get_required_where_fields_for_context(compartment_path, principal_type, principal=None)[source]
Canonical: Given a context, return (principal_key, set of required where input variables).
- Parameters:
- Returns:
Tuple with normalized principal key and set of all required where fields.
- Return type:
(principal_key, required_fields)
See docs/source/simulation_engine_usage_examples.py for usage.
- static extract_variable_names(cond_str)[source]
Extract variable names from a policy condition string using the OCI IAM Policy Condition parser.
- get_prospective_statements() list[dict[str, Any]][source]
Return a shallow copy of all current prospective statements.
These statements are in addition to the regular policy statements loaded from the tenancy/cache and are used for “what-if” simulation scenarios. They are not written back to OCI.
- set_prospective_statements(statements: list[dict[str, Any]]) None[source]
Replace the current set of prospective statements.
The caller (UI/MCP) provides lightweight dicts with at least
compartment_path,statement_text, and optionaldescription. This method will:mark each as
is_prospective = Trueensure each has a unique
internal_id
Persistence to disk/settings is orchestrated by the application; the engine simply owns the in-memory representation used for simulation.
- validate_prospective_statement(statement_text: str) dict[str, Any][source]
Validate and normalize a single prospective statement.
- Returns a dict with keys:
parsed: bool
valid: bool
invalid_reasons: list[str]
normalized: dict (if valid)
- simulate_and_record(principal_key: str, effective_path: str, api_operation: str, where_context: dict, checked_statement_ids: list[str] | None = None, trace_name: str | None = None, trace: bool = False) dict[str, Any][source]
Simulates permissions for a principal, compartment, and operation, recording full trace/results.
- Parameters:
principal_key (str) – The unique principal key (e.g., “group:default/Admins”) for the simulation.
effective_path (str) – The effective compartment path for simulation.
api_operation (str) – The API operation to check permission for.
where_context (dict) – Input/context for any evaluated ‘where’ policy clauses.
checked_statement_ids (Optional[list[str]]) – List of policy statement internal_ids to include in simulation. If omitted or None, all applicable statements for the context (principal_key and effective_path) are used.
trace_name (str, optional) – Optional name for this simulation trace/history.
trace (bool, optional) – If True, includes detailed step-by-step trace; else, summary only.
- Returns:
- Full simulation result. The engine always computes a full simulation trace
(permissions, context, per-statement evaluation). The return payload is shaped as follows:
- Top-level keys (always present):
api_call_allowed (bool): Final YES/NO decision for the API operation.
missing_permissions (list[str]): Permissions required by the operation but not granted.
required_permissions_for_api_operation (list[str]): All permissions the operation needs.
failure_reason (str): Empty if allowed, else human-readable explanation.
- Trace block (always present, but caller may choose whether to display it):
- simulation_trace (dict):
- {
‘final_permission_set’: […], ‘simulation_context’: {…}, ‘permissions_denied’: […], ‘trace_statements’: […] # present only when trace=True
}
- Notes:
The UI uses the trace flag to control how much of simulation_trace is displayed.
simulation_history always records the full simulation_trace (with trace_statements), independent of the trace flag used for the immediate UI response.
- Return type:
Example
result = engine.simulate_and_record(‘group:default/Admins’, ‘ROOT’, ‘oci:ListBuckets’, {}, checked_ids, trace=True) result = engine.simulate_and_record(‘group:default/Admins’, ‘ROOT’, ‘oci:ListBuckets’, {}, trace=True) # auto selects statements
- get_simulation_trace_list() list[dict[str, str]][source]
Get the list of names and timestamps for all recorded simulation traces.
- Returns:
- List of dictionaries with keys ‘name’ and ‘timestamp’
corresponding to each simulation history entry.
- Return type:
Example
[{‘name’: ‘Simulation 1’, ‘timestamp’: ‘2026-01-05T14:00:01’}, …]
- get_simulation_trace_by_index(idx: int) dict[str, Any] | None[source]
Retrieve a previously recorded simulation trace result by history index.
- Parameters:
idx (int) – Index into simulation history (0 = oldest).
- Returns:
The trace result dict, or None if index is out of range.
- Return type:
Example
>>> engine.get_simulation_trace_by_index(0) {'result': 'YES', ...}
- get_simulation_trace_by_name(name: str) dict[str, Any] | None[source]
Retrieve a simulation trace result by name label.
- get_api_operations(filter_text: str = '') list[source]
Get all known API operation names, optionally filtered by substring.
- Parameters:
filter_text (str, optional) – Substring to filter operation names (case-insensitive). Defaults to ‘’ (all ops).
- Returns:
Sorted list of API operation names.
- Return type:
Example
>>> engine.get_api_operations('bucket') ['oci:ListBuckets', 'oci:CreateBucket', ...]
- get_applicable_statements(principal_key: str, effective_path: str) list[dict][source]
Get all applicable policy statements for a principal and compartment by building a proper PolicySearch filter using the principal_key and effective_path.
- Parameters:
- Returns:
List of matching policy statement dicts.
- Return type:
List[dict]
Notes
Used by all canonical orchestration flows (UI/MCP)
See usage examples in docs/source/simulation_engine_usage_examples.py
- get_required_where_fields(statements: list[dict]) set[str][source]
Extract all unique “where clause” input field names needed by a set of policy statements.
- Parameters:
statements (list[dict]) – The statements to examine for conditional/where fields.
- Returns:
Set of all unique field names required as variables for condition evaluation.
- Return type:
Example
>>> fields = engine.get_required_where_fields(statements) >>> print(fields) # e.g. {'request.time', 'user.department'}