##########################################################################
# Copyright (c) 2024, Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#
# DISCLAIMER This is not an official Oracle application, It does not supported by Oracle Support.
#
# data_repo.py
#
# @author: Andrew Gregory
#
# Supports Python 3.12 and above
#
# coding: utf-8
##########################################################################
# Standard library imports
import collections
import csv
import hashlib
import json
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import UTC, datetime
from pathlib import Path
# Third-party imports
from oci import config, pagination
from oci.auth.signers import InstancePrincipalsSecurityTokenSigner, SecurityTokenSigner
from oci.exceptions import ConfigFileNotFound, ServiceError
from oci.identity import IdentityClient
from oci.identity_domains import IdentityDomainsClient
from oci.identity_domains.models import DynamicResourceGroup
from oci.loggingsearch import LogSearchClient
from oci.loggingsearch.models import SearchLogsDetails, SearchResult
from oci.resource_search import ResourceSearchClient
from oci.resource_search.models import StructuredSearchDetails
from oci.signer import load_private_key_from_file
from oci_policy_analysis.common.logger import get_logger
from oci_policy_analysis.common.models import (
AdmitStatement,
BasePolicy,
BasePolicyStatement,
Compartment,
DefineStatement,
DynamicGroup,
DynamicGroupSearch,
EndorseStatement,
Group,
GroupSearch,
PolicySearch,
RegularPolicyStatement,
User,
UserSearch,
)
from oci_policy_analysis.logic.policy_statement_normalizer import PolicyStatementNormalizer
# Global logger for this module
logger = get_logger(component='data_repo')
# Constants
THREADS = 6
# Cache Directory and Date (for consistency across classes)
CACHE_DIR = Path.home() / '.oci-policy-analysis' / 'cache'
# For MCP-specific JSON
VALID_VERBS = {'inspect', 'read', 'use', 'manage'}
[docs]
class PolicyAnalysisRepository:
"""
This is the main data repository for Policy, Identity, and Compartment data
During initialization, the entire compartment hierarchy and policy tree is loaded into a central JSON dictionary.
This central dictionary is then referenced by functions that filter and return a subset of information for display.
Parsing, additional analysis, and import/export are made available by additional functions exposed.
Loading of data starts from `load_policies_and_compartments`, which loads all compartments and policies recursively
Filtering functions return lists of dataclass objects defined in models.py for easy consumption by UI or CLI layers.
See `filter_policy_statements` for an example of filtering and returning PolicyStatement objects.
"""
def _api_call_with_logging(self, label, fn, *args, **kwargs):
"""Wrap OCI API call for logging+timing at INFO or CRITICAL based on settings."""
# TODO: https://docs.oracle.com/en-us/iaas/tools/python/latest/exceptions.html
log_critical = False
try:
if self.settings and isinstance(self.settings, dict):
log_critical = self.settings.get('always_log_api_calls', False)
except Exception:
pass
level_func = logger.critical if log_critical else logger.info
t0 = time.perf_counter()
try:
result = fn(*args, **kwargs)
elapsed = time.perf_counter() - t0
# Print more details about the API call on success
level_func(
f'[API] {label} ({getattr(fn, "__name__", repr(fn))}) succeeded in {elapsed:.2f}s — args={args} kwargs={kwargs}'
)
return result
except ServiceError as se:
elapsed = time.perf_counter() - t0
# Print detailed ServiceError info
logger.error(
f'[API] {label} ({getattr(fn, "__name__", repr(fn))}) ServiceError after {elapsed:.2f}s: code={se.code} status={se.status} message={se.message} args={args}, kwargs={kwargs}'
)
raise
except Exception as e:
elapsed = time.perf_counter() - t0
# Print more details on generic exception
logger.error(
f'[API] {label} ({getattr(fn, "__name__", repr(fn))}) failed after {elapsed:.2f}s: {e} args={args}, kwargs={kwargs}'
)
raise
def __init__(self):
self.compartments = [] # List of dicts: {id, name, parent_id, hierarchy_path, hierarchy_ocids}
self.policies: list[BasePolicy] = [] # List of BasePolicy dicts
self.regular_statements: list[RegularPolicyStatement] = []
self.cross_tenancy_statements = []
self.defined_aliases: list[DefineStatement] = [] # Store define statements as list of dict
self.dynamic_groups = []
self.identity_domains = []
self.groups = []
self.users: list[User] = []
self.domain_clients = {}
self.data_as_of = ''
self.tenancy_ocid = None
# OCI service clients (may be None when working offline/cache-only)
self.identity_client = None
self.logging_search_client = None
self.resource_search_client = None
self.limits_client = None
self.identity_loaded_from_tenancy = False
self.policies_loaded_from_tenancy = False
self.version = 2
self.load_all_users = True
# Settings controlling logging/behavior (injected by App)
self.settings = None
# Keep the refence data repo as a member
# self.permission_reference_repo = ReferenceDataRepo()
self.permission_reference_repo = None
# self.on_policy_statements_updated = None # Optional callback, set by UI for reload hooks
logger.info('Initialized PolicyAnalysisRepo')
# Create a Normalizer instance
self.normalizer = PolicyStatementNormalizer()
# Cached tenancy-wide policy statement limit (fetch once per run)
self.tenancy_policy_statement_limit = None
[docs]
def reset_state(self):
"""
Resets all main state variables (lists, dictionaries, flags, clients, IDs, etc.).
Call this before any data (re)load operation for a clean repository state.
"""
self.compartments = []
self.policies = []
self.regular_statements = []
self.cross_tenancy_statements = []
self.defined_aliases = []
self.dynamic_groups = []
self.identity_domains = []
self.groups = []
self.users = []
self.domain_clients = {}
self.data_as_of = ''
self.tenancy_ocid = None
# Reset all OCI service clients so that any previous tenancy context
# does not leak across cache/JSON/CIS loads. Callers that need a
# client must either re-run initialize_client or gracefully handle
# the None case.
self.identity_client = None
self.logging_search_client = None
self.resource_search_client = None
self.limits_client = None
self.identity_loaded_from_tenancy = False
self.policies_loaded_from_tenancy = False
self.version = 1
self.load_all_users = True
# Do not replace permission_reference_repo: it is injected by the app (main) and
# must remain the loaded ReferenceDataRepo so risk scoring and permission lookups work.
# If there are additional ephemeral analysis/cache attributes, reset them here
# (e.g., self._policy_progress_queue, self.normalizer, cached_*, etc.)
logger.info('PolicyAnalysisRepository state has been reset.')
[docs]
def initialize_client(
self,
use_instance_principal: bool,
session_token: str | None = None,
recursive: bool = True,
profile: str = 'DEFAULT',
) -> bool:
"""Initializes the OCI client to be used for all data operations
Client can be loaded using PROFILE or Instance Principal authentication methods
Args:
use_instance_principal: Whether to attempt Instance Principal signer-based authentication
recursive: Whether to load tenancy data across all compartments, or simply the root (tenancy) compartment
session: The named OCI Session Token Profile to use - must be present on the file system in the standard OCI location of .oci/config
profile: The named OCI Profile to use - must be present on the file system in the standard OCI location of .oci/config
Returns:
A boolean indicating whether the client was created successfully. False indicates that an unrecoverable issue occurred
setting up the client.
"""
self.session_token = session_token
self.use_instance_principal = use_instance_principal
try:
from oci.limits import LimitsClient
if use_instance_principal:
logger.debug('Using Instance Principal Authentication')
self.signer = InstancePrincipalsSecurityTokenSigner()
# Identity for all policy Data
self.identity_client = IdentityClient(config={}, signer=self.signer)
self.logging_search_client = LogSearchClient(config={}, signer=self.signer)
self.resource_search_client = ResourceSearchClient(config={}, signer=self.signer)
self.limits_client = LimitsClient(config={}, signer=self.signer)
self.tenancy_ocid = self.signer.tenancy_id
elif session_token:
logger.info('Attempt session auth')
self.config = config.from_file(profile_name=session_token)
token_file = self.config['security_token_file']
token = None
with open(token_file) as f:
token = f.read()
private_key = load_private_key_from_file(self.config['key_file'])
self.signer = SecurityTokenSigner(token, private_key)
self.identity_client = IdentityClient({'region': self.config['region']}, signer=self.signer)
self.resource_search_client = ResourceSearchClient(
{'region': self.config['region']}, signer=self.signer
)
self.limits_client = LimitsClient({'region': self.config['region']}, signer=self.signer)
self.tenancy_ocid = self.config['tenancy']
logger.info('Success session auth')
else:
logger.debug(f'Using Profile Authentication: {profile}')
self.config = config.from_file(profile_name=profile)
self.identity_client = IdentityClient(self.config)
self.logging_search_client = LogSearchClient(self.config)
self.tenancy_ocid = self.config['tenancy']
self.resource_search_client = ResourceSearchClient(self.config)
self.limits_client = LimitsClient(self.config)
logger.info(f'Set up Identity Client for tenancy: {self.tenancy_ocid}')
# Set Recursion
self.recursive = recursive
logger.debug(f'Set recursive to: {self.recursive}')
# Get tenancy name
self.tenancy_name = self.identity_client.get_compartment(compartment_id=self.tenancy_ocid).data.name
logger.info(f'Initialized client for tenancy: {self.tenancy_name} ({self.tenancy_ocid})')
return True
except (ConfigFileNotFound, Exception) as exc:
logger.fatal(f'Authentication failed: {exc}')
return False
[docs]
def check_statement_location_validity(self, st):
"""
Checks if the compartment location for a statement is valid (exists and is ACTIVE).
Args:
st: The policy statement (dict).
Returns:
None if valid; string message if invalid.
"""
if st.get('location_type') == 'compartment id':
logger.info(f'Checking location validity for statement: {st.get("statement_text")}')
location_ocid = st.get('location')
if not self._check_invalid_location(location_ocid):
return f'Compartment OCID {location_ocid} not found in tenancy'
return None
def _check_invalid_location(self, compartment_ocid) -> bool:
"""
Given a compartment OCID-based location, return False if there is no compartment (any more)
or if the compartment is not ACTIVE. True if it exists and is ACTIVE.
Called from Policy IntelligenceEngine.find_invalid_statements() - only doing this now because of the OCI Client needed
"""
try:
comp: Compartment = self.identity_client.get_compartment(compartment_id=compartment_ocid).data
if comp.lifecycle_state == Compartment.LIFECYCLE_STATE_ACTIVE:
return True
else:
logger.warning(f'Found Compartment but not ACTIVE: {compartment_ocid} was: {comp.lifecycle_state}')
return False
except Exception as e:
# Any error means it is invalid
logger.debug(f'Compartment OCID {compartment_ocid} not valid: {e}')
return False
def _parse_define_statement(self, policy: BasePolicy, statement: DefineStatement) -> bool:
"""
This is now a thin wrapper calling the centralized PolicyStatementNormalizer.
"""
try:
# Use definition's base model fields for required meta
base = {
k: statement[k]
for k in [
'policy_name',
# 'policy_description',
'policy_ocid',
'compartment_ocid',
'compartment_path',
'creation_time',
'internal_id',
]
if k in statement
}
normalized = self.normalizer.normalize(
statement_text=statement['statement_text'], statement_type='define', base_fields=base
)
if isinstance(normalized, dict) and not normalized.get('parsed', True):
# convert statement to dict to ensure we can add fields
statement_dict = dict(statement)
statement_dict['parsed'] = False
statement_dict['valid'] = False
statement_dict['invalid_reasons'] = normalized.get('invalid_reasons', [])
logger.debug(
f'Define statement was unable to normalize: {statement_dict.get("statement_text")} | Reason: {statement_dict.get("invalid_reasons")}'
)
self.defined_aliases.append(statement_dict)
return False
self.defined_aliases.append(normalized)
logger.debug(f'Define Statement Added: {normalized}')
return True
except Exception as e:
statement['parsed'] = False
statement['valid'] = False
statement['invalid_reasons'] = [f'Normalize define statement failed: {e}']
logger.debug(f'Normalize define statement failed: {e}')
self.defined_aliases.append(statement)
return False
def _parse_admit_statement(self, policy: BasePolicy, statement: AdmitStatement) -> bool:
"""
This is now a thin wrapper calling the centralized PolicyStatementNormalizer.
"""
try:
base = {
k: statement[k]
for k in [
'policy_name',
# 'policy_description',
'policy_ocid',
'compartment_ocid',
'compartment_path',
'creation_time',
'internal_id',
]
if k in statement
}
normalized = self.normalizer.normalize(
statement_text=statement['statement_text'], statement_type='admit', base_fields=base
)
if isinstance(normalized, dict) and not normalized.get('parsed', True):
statement_dict = dict(statement)
statement_dict['parsed'] = False
statement_dict['valid'] = False
statement_dict['invalid_reasons'] = normalized.get('invalid_reasons', [])
logger.debug(
f"Admit statement was unable to normalize: {statement_dict.get('statement_text')} | Reason: {statement_dict.get('invalid_reasons')}"
)
self.cross_tenancy_statements.append(statement_dict)
return False
self.cross_tenancy_statements.append(normalized)
logger.debug(f'Admit Statement Added: {normalized}')
return True
except Exception as ex:
statement['valid'] = False
statement['parsed'] = False
statement['invalid_reasons'] = [f'Normalize admit parser failed: {ex}']
logger.debug(f'Normalize admit parser failed: {ex}')
self.cross_tenancy_statements.append(statement)
return False
def _parse_endorse_statement(self, policy: BasePolicy, statement: EndorseStatement) -> bool:
"""
This is now a thin wrapper calling the centralized PolicyStatementNormalizer.
"""
try:
base = {
k: statement[k]
for k in [
'policy_name',
# 'policy_description',
'policy_ocid',
'compartment_ocid',
'compartment_path',
'creation_time',
'internal_id',
]
if k in statement
}
normalized = self.normalizer.normalize(
statement_text=statement['statement_text'], statement_type='endorse', base_fields=base
)
if isinstance(normalized, dict) and not normalized.get('parsed', True):
statement_dict = dict(statement)
statement_dict['parsed'] = False
statement_dict['valid'] = False
statement_dict['invalid_reasons'] = normalized.get('invalid_reasons', [])
logger.debug(
f"Endorse statement was unable to normalize: {statement_dict.get('statement_text')} | Reason: {statement_dict.get('invalid_reasons')}"
)
self.cross_tenancy_statements.append(statement_dict)
return False
self.cross_tenancy_statements.append(normalized)
logger.debug(f'Endorse Statement Added: {normalized}')
return True
except Exception as ex:
statement['valid'] = False
statement['parsed'] = False
statement['invalid_reasons'] = [f'Normalize endorse parser failed: {ex}']
logger.debug(f'Normalize endorse parser failed: {ex}')
self.cross_tenancy_statements.append(statement)
return False
def _resolve_ocid_subjects_in_statement(self, stmt: RegularPolicyStatement):
"""
If the statement has subject_type group or dynamic-group and all subjects are OCIDs,
replace each OCID with (domain, name) if resolvable, otherwise ('Unknown', ocid).
Mark as invalid if any unresolved OCIDs. Add parsing_notes for both resolution and unresolved cases.
This is done in-place on the statement dict.
"""
subject_type = stmt.get('subject_type')
subjects = stmt.get('subject', [])
if not (subject_type in ('group', 'dynamic-group') and isinstance(subjects, list)):
return
# Detect if all subjects are in OCID format (no tuple/list inside)
all_ocids = all(isinstance(s, str) and s.lower().startswith('ocid1.') for s in subjects)
if not all_ocids:
return
resolved_subjects = []
unresolved_ocids = []
for ocid in subjects:
if subject_type == 'group':
grp = next((g for g in self.groups if g.get('group_ocid', '').lower() == ocid.lower()), None)
if grp:
dom = grp.get('domain_name') or 'Default'
name = grp.get('group_name') or ocid
resolved_subjects.append((dom, name))
else:
resolved_subjects.append(('Unknown', ocid))
unresolved_ocids.append(ocid)
elif subject_type == 'dynamic-group':
dg = next(
(d for d in self.dynamic_groups if d.get('dynamic_group_ocid', '').lower() == ocid.lower()), None
)
if dg:
dom = dg.get('domain_name') or 'Default'
name = dg.get('dynamic_group_name') or ocid
resolved_subjects.append((dom, name))
else:
resolved_subjects.append(('Unknown', ocid))
unresolved_ocids.append(ocid)
stmt['subject'] = resolved_subjects
notes = stmt.setdefault('parsing_notes', [])
if len(unresolved_ocids) > 0:
notes.append(f"Failed to resolve OCID(s): {', '.join(unresolved_ocids)}; inserted as ('Unknown', ocid)")
stmt['valid'] = False
else:
notes.append('All OCID subject(s) resolved to domain/name tuple(s).')
def _parse_statement(self, policy: BasePolicy, statement: RegularPolicyStatement) -> bool:
"""
This is now a thin wrapper calling the centralized PolicyStatementNormalizer.
"""
try:
base = {
k: statement[k]
for k in [
'policy_name',
# 'policy_description',
'policy_ocid',
'compartment_ocid',
'compartment_path',
'creation_time',
'internal_id',
]
if k in statement
}
normalized = self.normalizer.normalize(
statement_text=statement['statement_text'], statement_type='regular', base_fields=base
)
if isinstance(normalized, dict) and not normalized.get('parsed', True):
statement_dict = dict(statement)
statement_dict['action'] = 'unknown'
statement_dict['parsed'] = False
statement_dict['valid'] = False
statement_dict['invalid_reasons'] = normalized.get('invalid_reasons', [])
logger.debug(
f"Regular statement was unable to normalize: {statement_dict.get('statement_text')} | Reason: {statement_dict.get('invalid_reasons')}"
)
logger.debug(f'Full invalid statement data: {statement_dict}')
self.regular_statements.append(statement_dict)
return False
# OCID subject resolution step
self._resolve_ocid_subjects_in_statement(normalized)
self.regular_statements.append(normalized)
logger.debug(f'Regular Policy Statement Parsed: {normalized}')
logger.debug(f'Regular Policy Statement Parsed: {normalized}')
return True
except Exception as ex:
statement['parsed'] = False
statement['valid'] = False
statement['invalid_reasons'] = [f'Normalize regular policy parser failed: {ex}']
logger.debug(f'Normalize regular policy parser failed: {ex}')
self.regular_statements.append(statement)
return False
def _parse_dynamic_group(self, domain, dg: DynamicResourceGroup) -> DynamicGroup:
"""Extract the contents of the DG into a dict"""
logger.debug(f'Created by: {dg.idcs_created_by}')
return DynamicGroup(
domain_name=domain.display_name,
domain_ocid=domain.id,
dynamic_group_name=dg.display_name,
dynamic_group_id=dg.id,
description=dg.description or '',
matching_rule=dg.matching_rule,
in_use=True, # Placeholder until analysis is run
dynamic_group_ocid=dg.ocid,
creation_time=str(dg.meta.created),
created_by_ocid=dg.idcs_created_by.ocid if dg.idcs_created_by else None,
created_by_name=dg.idcs_created_by.display if dg.idcs_created_by else None,
)
# --- Main Data Loading Functions for Tenancy ---
[docs]
def load_compartments_only(self) -> bool:
"""
Loads only compartments (hierarchy, flat) using OCI Clients.
"""
self.compartments = []
start_time = time.perf_counter()
try:
logger.info('Bulk fetching all compartments...')
root_comp_response = self._api_call_with_logging(
'IdentityClient.get_compartment', self.identity_client.get_compartment, compartment_id=self.tenancy_ocid
)
if not root_comp_response or not root_comp_response.data:
logger.error(f'Failed to get root compartment: {self.tenancy_ocid}')
return False
root_comp = root_comp_response.data
comp_response = self._api_call_with_logging(
'IdentityClient.list_compartments',
pagination.list_call_get_all_results,
self.identity_client.list_compartments,
self.tenancy_ocid,
access_level='ACCESSIBLE',
sort_order='ASC',
compartment_id_in_subtree=True,
lifecycle_state='ACTIVE',
limit=1000,
)
all_comps = [root_comp] + (list(comp_response.data) if comp_response and comp_response.data else [])
logger.info(f'Total compartments loaded: {len(all_comps)}')
# Build our internal list with hierarchy paths - also extract tags if present
self.compartments.clear()
for comp in all_comps:
tags = {}
if hasattr(comp, 'freeform_tags') and comp.freeform_tags:
tags.update(comp.freeform_tags)
if hasattr(comp, 'defined_tags') and comp.defined_tags:
for ns, val in comp.defined_tags.items():
if isinstance(val, dict):
for k, v in val.items():
tags[f'{ns}:{k}'] = v
else:
tags[ns] = val
compartment = Compartment(
id=comp.id,
name=(comp.name if comp.id != self.tenancy_ocid else 'ROOT'),
parent_id=comp.compartment_id,
hierarchy_path='',
description=getattr(comp, 'description', '') or '',
lifecycle_state=getattr(comp, 'lifecycle_state', '') or '',
**({'tags': tags} if tags else {}),
)
self.compartments.append(compartment)
logger.info('Building compartment hierarchy paths and lookup tables...')
for compartment in self.compartments:
compartment['hierarchy_path'] = self._get_hierarchy_path_for_compartment(compartment, '')
total_time = time.perf_counter() - start_time
logger.info(f'Loaded {len(self.compartments)} compartments in {total_time:.2f}s')
return True
except Exception as e:
logger.error(f'Failed to load compartments: {e}')
return False
[docs]
def load_policies_only(self) -> bool: # noqa: C901
"""
Loads policies/statements only, assuming compartments are already loaded.
"""
self.policies = []
self.regular_statements = []
self.cross_tenancy_statements = []
self.defined_aliases = []
# Ensure compliance flag is reset on live tenancy load
self.loaded_from_compliance_output = False
start_time = time.perf_counter()
try:
logger.info('Bulk fetching all policies for all compartments...')
if self.recursive:
policy_query = 'query policy resources'
else:
policy_query = f"query policy resources where compartmentId = '{self.tenancy_ocid}'"
# Run policy search and then for each result, fetch the full policy details and statements - do this in threads for speed
policy_search_results = self._api_call_with_logging(
'ResourceSearchClient.search_resources',
self.resource_search_client.search_resources,
search_details=StructuredSearchDetails(type='Structured', query=policy_query),
limit=1000,
)
if policy_search_results and policy_search_results.data and policy_search_results.data.items:
logger.info(
f'Found {len(policy_search_results.data.items)} policies via Resource Search (recursive={self.recursive}).'
)
total_policies = len(policy_search_results.data.items)
def _process_policy_resource(item, position, total_policies): # noqa: C901
policy_ocid = item.identifier
compartment_ocid = item.compartment_id
try:
policy_response = self._api_call_with_logging(
'IdentityClient.get_policy', self.identity_client.get_policy, policy_id=policy_ocid
)
if policy_response and policy_response.data:
# Extract tags (keep original structure for round-trip)
freeform_tags = {}
defined_tags = {}
if hasattr(policy_response.data, 'freeform_tags') and policy_response.data.freeform_tags:
freeform_tags = dict(policy_response.data.freeform_tags)
if hasattr(policy_response.data, 'defined_tags') and policy_response.data.defined_tags:
try:
defined_tags = dict(policy_response.data.defined_tags)
except Exception:
defined_tags = {}
# Flatten tags for UI display (namespace:key for defined tags)
tags = {}
if freeform_tags:
tags.update(freeform_tags)
if defined_tags:
for ns, val in defined_tags.items():
if isinstance(val, dict):
for k, v in val.items():
tags[f'{ns}:{k}'] = v
else:
tags[str(ns)] = str(val)
comp_path = next(
(
comp['hierarchy_path']
for comp in self.compartments
if comp['id'] == policy_response.data.compartment_id
),
'ROOT',
)
policy_obj = BasePolicy(
policy_ocid=policy_response.data.id,
policy_name=policy_response.data.name,
description=policy_response.data.description or '',
compartment_ocid=policy_response.data.compartment_id,
compartment_path=comp_path,
creation_time=policy_response.data.time_created,
tags=tags if tags else None,
freeform_tags=freeform_tags if freeform_tags else None,
defined_tags=defined_tags if defined_tags else None,
)
self.policies.append(policy_obj)
for statement in policy_response.data.statements:
hierarchy_path = next(
(
comp['hierarchy_path']
for comp in self.compartments
if comp['id'] == compartment_ocid
),
'UNKNOWN_PATH',
)
base_policy_statement: BasePolicyStatement = BasePolicyStatement(
policy_name=policy_response.data.name,
policy_ocid=policy_response.data.id,
compartment_ocid=policy_response.data.compartment_id,
compartment_path=hierarchy_path,
statement_text=statement,
creation_time=str(policy_response.data.time_created),
internal_id=hashlib.md5((statement + policy_response.data.id).encode()).hexdigest(),
parsed=False,
)
st_text_lower = statement.strip().lower()
if st_text_lower.startswith('define'):
define_statement: DefineStatement = DefineStatement(**base_policy_statement)
self._parse_define_statement(policy_obj, define_statement)
elif (
st_text_lower.startswith('admit')
or st_text_lower.startswith('endorse')
or st_text_lower.startswith('deny admit')
or st_text_lower.startswith('deny endorse')
):
if st_text_lower.startswith('admit') or st_text_lower.startswith('deny admit'):
admit_statement: AdmitStatement = AdmitStatement(**base_policy_statement)
self._parse_admit_statement(policy_obj, admit_statement)
elif st_text_lower.startswith('endorse') or st_text_lower.startswith(
'deny endorse'
):
endorse_statement: EndorseStatement = EndorseStatement(**base_policy_statement)
self._parse_endorse_statement(policy_obj, endorse_statement)
else:
policy_statement: RegularPolicyStatement = RegularPolicyStatement(
**base_policy_statement
)
self._parse_statement(policy_obj, policy_statement)
except Exception as e:
logger.warning(
f'Failed to get policy {policy_ocid}: {e}. '
'This may be expected if the policy was deleted as part of a consolidation plan execution.'
)
with ThreadPoolExecutor(max_workers=THREADS) as executor:
for idx, item in enumerate(policy_search_results.data.items):
executor.submit(_process_policy_resource, item, idx, total_policies)
self.data_as_of = str(datetime.now(UTC))
total_time = time.perf_counter() - start_time
logger.info(f'Bulk loaded {len(self.regular_statements)} policy statements in {total_time:.2f}s')
self._enrich_compartments_with_statement_counts()
self.policies_loaded_from_tenancy = True
return True
except Exception as e:
logger.error(f'Failed to load policies: {e}')
return False
[docs]
def load_policies_and_compartments(self) -> bool: # noqa: C901
"""
Loads both compartments and all policies using OCI Clients. (Convenience function)
"""
# Always reset reload timestamp unless restored/preserved by special path (e.g., cache); complies with cache/offline/compliance logic elsewhere too.
self.policy_data_reloaded = None
# Ensure compliance flag is reset on live tenancy load
self.loaded_from_compliance_output = False
ok1 = self.load_compartments_only()
if not ok1:
return False
ok2 = self.load_policies_only()
return ok2
[docs]
def reload_compartment_policy_data(self) -> bool:
"""
Reload just the policy/compartment/statement data (not IAM), and update the in-memory timestamp.
(No cache operations here—see main.py/App for cache update and UI triggers.)
Returns:
bool: True if the reload succeeded, False otherwise.
"""
logger.info('Reloading only compartment+policy+statement data (not IAM)... (No cache ops in repo)')
success = self.load_policies_and_compartments()
if not success:
logger.error('Policy/compartment reload failed!')
return False
self.policy_data_reloaded = datetime.now(UTC).isoformat()
return True
[docs]
def fetch_tenancy_policy_statement_limits(self):
"""
Fetch two key limits from OCI Limits service ("Identity"):
- policies-count (max policies in tenancy)
- statements-count (max statements per policy)
Uses _api_call_with_logging to time/log the call.
Returns a tuple: (policies_count_limit, statements_per_policy_limit) or (None, None) if unavailable or error.
"""
logger = get_logger(component='limits_fetch')
policies_count = None
statements_count = None
try:
if not self.tenancy_ocid:
logger.error('No tenancy_ocid set; cannot fetch OCI policy limits')
return (None, None)
limits_client = getattr(self, 'limits_client', None)
if not limits_client:
logger.error('No limits_client found. Did you run initialize_client first?')
return (None, None)
result = self._api_call_with_logging(
'LimitsClient.list_limit_values (identity)',
limits_client.list_limit_values,
service_name='identity',
compartment_id=self.tenancy_ocid,
)
limits = result.data if hasattr(result, 'data') else []
if not limits:
logger.error('No limits returned from OCI API for identity service.')
self.tenancy_policy_statement_limit = (None, None)
return (None, None)
for limit in limits:
if getattr(limit, 'name', None) == 'policies-count':
policies_count = getattr(limit, 'value', None)
logger.info(f'Limit: {limit}')
elif getattr(limit, 'name', None) == 'statements-count':
statements_count = getattr(limit, 'value', None)
logger.info(f'Limit: {limit}')
self.tenancy_policy_statement_limit = (policies_count, statements_count)
if policies_count is None or statements_count is None:
logger.warning(
'Failed to find some limit values: policies-count=%s, statements-count=%s',
str(policies_count),
str(statements_count),
)
return (policies_count, statements_count)
except Exception as e:
logger.error(f'[API] list_limit_values failed: {e}')
self.tenancy_policy_statement_limit = (None, None)
return (None, None)
# --- Internal fetchers for Identity Domain entities ---
def _fetch_dynamic_groups_for_domain(self, domain, domain_client):
"""
Fetch all dynamic groups for a domain, returning a list of DynamicGroup model objects.
Uses ThreadPoolExecutor to fetch details in parallel and appends incrementally for UI.
"""
import threading
logger.info(f'Fetching dynamic groups for domain: {domain.display_name}')
dg_list = []
dg_lock = threading.Lock()
try:
dg_response = self._api_call_with_logging(
'IdentityDomainsClient.list_dynamic_resource_groups',
domain_client.list_dynamic_resource_groups,
attribute_sets=['never'],
)
if dg_response and dg_response.data:
logger.debug(f'Got the List of DG for {domain.display_name}. Count: {len(dg_response.data.resources)}')
def fetch_full_dg(_dg):
try:
thread_id = threading.get_ident()
thread_name = threading.current_thread().name
logger.debug(
f"Thread {thread_name} (id={thread_id}) starting fetch_full_dg for dg_id={getattr(_dg, 'id', None)} display_name={getattr(_dg, 'display_name', None)}"
)
full_dg = self._api_call_with_logging(
'IdentityDomainsClient.get_dynamic_resource_group',
domain_client.get_dynamic_resource_group,
dynamic_resource_group_id=_dg.id,
attribute_sets=['all'],
).data
logger.debug(
f"Thread {thread_name} (id={thread_id}) finished fetch_full_dg for dg_id={getattr(_dg, 'id', None)} display_name={getattr(_dg, 'display_name', None)}"
)
parsed = self._parse_dynamic_group(domain=domain, dg=full_dg)
with dg_lock:
self.dynamic_groups.append(parsed)
return parsed
except Exception as e:
logger.error(f'Failed to fetch dynamic group details for: {_dg.id}: {e}')
return None
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures = [executor.submit(fetch_full_dg, _dg) for _dg in dg_response.data.resources]
for f in as_completed(futures):
result = f.result()
if result:
dg_list.append(result)
else:
logger.error('Failed to list dynamic groups')
return []
except Exception as e:
logger.error(f'Exception during dynamic group fetch: {e}')
return []
logger.info(f'Fetched {len(dg_list)} dynamic groups for domain: {domain.display_name}')
return dg_list
def _fetch_groups_for_domain(self, domain, domain_client):
"""
Fetch all groups for a domain, returning a list of Group model objects using ThreadPoolExecutor.
"""
import threading
logger.info(f'Fetching groups for domain: {domain.display_name}')
group_list = []
group_lock = threading.Lock()
try:
start_index = 1
limit = 1000
def fetch_full_group(g):
try:
thread_id = threading.get_ident()
thread_name = threading.current_thread().name
logger.debug(
f"Thread {thread_name} (id={thread_id}) starting fetch_full_group for group_id={getattr(g, 'id', None)} display_name={getattr(g, 'display_name', None)}"
)
group_obj = Group(
domain_name=domain.display_name,
group_name=g.display_name,
group_ocid=g.ocid,
group_id=g.id,
description=getattr(
g, 'urn_ietf_params_scim_schemas_oracle_idcs_extension_group_group', None
).description
if getattr(g, 'urn_ietf_params_scim_schemas_oracle_idcs_extension_group_group', None)
else '',
)
with group_lock:
self.groups.append(group_obj)
logger.debug(
f"Thread {thread_name} (id={thread_id}) finished fetch_full_group for group_id={getattr(g, 'id', None)} display_name={getattr(g, 'display_name', None)}"
)
return group_obj
except Exception as e:
logger.error(f"Failed to process group details for: {getattr(g, 'id', None)}: {e}")
return None
from concurrent.futures import ThreadPoolExecutor, as_completed
while True:
group_response = self._api_call_with_logging(
'IdentityDomainsClient.list_groups',
domain_client.list_groups,
start_index=start_index,
count=limit,
sort_by='displayName',
sort_order='ASCENDING',
)
if group_response.data is None or not group_response.data.resources:
break
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures = [executor.submit(fetch_full_group, g) for g in group_response.data.resources]
for f in as_completed(futures):
res = f.result()
if res:
group_list.append(res)
if (
len(group_response.data.resources) < limit
or start_index + limit > group_response.data.total_results
):
break
start_index += limit
logger.info(f'Fetched {len(group_list)} groups for domain: {domain.display_name}')
return group_list
except Exception as e:
logger.error(f'Exception during group fetch: {e}')
return []
def _fetch_users_for_domain(self, domain, domain_client): # noqa: C901
"""
Fetch users for a domain using OCI generator + ThreadPoolExecutor for user detail calls.
Uses pagination.list_call_get_all_results_generator to list users.
"""
logger.info(f'Fetching users for domain: {domain.display_name} with paginator and thread pool')
def user_summary_generator():
start_index = 1
limit = 1000
while True:
user_response = self._api_call_with_logging(
'IdentityDomainsClient.list_users',
domain_client.list_users,
start_index=start_index,
count=limit,
sort_by='displayName',
sort_order='ASCENDING',
attribute_sets=['never'],
)
if user_response.data is None or not user_response.data.resources:
break
yield from user_response.data.resources
if len(user_response.data.resources) < limit or start_index + limit > user_response.data.total_results:
break
start_index += limit
import threading
def fetch_full_user(u):
try:
thread_id = threading.get_ident()
thread_name = threading.current_thread().name
logger.debug(
f"Thread {thread_name} (id={thread_id}) starting fetch_full_user for user_id={getattr(u, 'id', None)} display_name={getattr(u, 'display_name', None)}"
)
user_attributes = self._api_call_with_logging(
'IdentityDomainsClient.get_user',
domain_client.get_user,
user_id=u.id,
attribute_sets=['all'],
).data
logger.debug(
f"Thread {thread_name} (id={thread_id}) finished fetch_full_user for user_id={getattr(u, 'id', None)} display_name={getattr(u, 'display_name', None)}"
)
groups_list = (
[gg.ocid for gg in getattr(user_attributes, 'groups', []) if hasattr(gg, 'ocid')]
if hasattr(user_attributes, 'groups') and user_attributes.groups
else []
)
email = 'None'
if hasattr(user_attributes, 'emails') and user_attributes.emails:
for em in user_attributes.emails:
if getattr(em, 'primary', False):
email = em.value
break
return User(
domain_name=domain.display_name,
user_name=u.user_name,
user_ocid=u.ocid,
display_name=u.display_name,
email=email,
user_id=u.id,
groups=groups_list,
)
except Exception as exc:
logger.error(f'Failed to fetch user detail for {u.display_name}: {exc}')
return None
# Thread pool for get_user calls, incrementally append to self.users
try:
from threading import Lock
user_list = []
user_lock = Lock()
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures = []
for user_summary in user_summary_generator():
futures.append(executor.submit(fetch_full_user, user_summary))
for f in as_completed(futures):
result = f.result()
if result:
# Append incrementally, with lock for thread safety with GUI callbacks
with user_lock:
self.users.append(result)
user_list.append(result)
logger.info(f'Fetched {len(user_list)} users for domain: {domain.display_name}')
except Exception as e:
logger.error(f'Exception during user fetch: {e}')
return user_list
[docs]
def load_complete_identity_domains( # noqa: C901
self, load_all_users: bool = True, compartment_domain_search_depth: int = 1
) -> bool:
"""
Loads users, groups, dynamic groups, and domains for all compartments up to the given depth
below the root compartment. No longer uses explicit domain_compartment_ocids.
"""
try:
seen_domain_ids = set()
all_domains = []
def add_domains_from_compartment(compartment_id: str) -> bool:
resp = self._api_call_with_logging(
'IdentityClient.list_domains', self.identity_client.list_domains, compartment_id=compartment_id
)
logger.info(
f'Listed domains for compartment {compartment_id}: {len(resp.data) if resp and resp.data else 0}'
)
if resp.data is None:
logger.error('Failed to list identity domains for compartment %s', compartment_id)
return False
for d in resp.data:
if d.id not in seen_domain_ids:
seen_domain_ids.add(d.id)
all_domains.append(d)
return True
# Ensure compartments are loaded (critical for depth BFS)
if not hasattr(self, 'compartments') or not self.compartments:
logger.warning('Compartments not loaded yet; calling load_policies_and_compartments() to load.')
self.load_policies_and_compartments()
if not self.compartments:
logger.error('Compartment load failed or returned empty. Falling back to root-only search.')
compartments_to_enumerate = [self.tenancy_ocid]
else:
parent_map = collections.defaultdict(list)
for comp in self.compartments:
parent_id = comp.get('parent_id') or self.tenancy_ocid
parent_map[parent_id].append(comp)
cur_level = [self.tenancy_ocid]
all_ocids = set(cur_level)
for _lvl in range(1, max(1, compartment_domain_search_depth)):
next_level = []
for cid in cur_level:
for child in parent_map.get(cid, []):
child_id = child.get('id')
if child_id and child_id not in all_ocids:
next_level.append(child_id)
all_ocids.add(child_id)
cur_level = next_level
if not cur_level:
break
compartments_to_enumerate = list(all_ocids)
logger.info(
f'Enumerating domains from compartments at depth {compartment_domain_search_depth}: {compartments_to_enumerate}'
)
for comp_ocid in compartments_to_enumerate:
logger.info(f'Calling add_domains_from_compartment with: {comp_ocid}')
if not add_domains_from_compartment(comp_ocid):
return False
self.identity_domains = all_domains
logger.info(
'Loaded %s identity domains from %s compartments',
len(self.identity_domains),
len(compartments_to_enumerate),
)
self.domain_clients = {}
for domain in self.identity_domains:
try:
# Get IdentityDomainsClient and hold on to it
if self.use_instance_principal:
domain_client = IdentityDomainsClient(
config={}, signer=self.signer, service_endpoint=domain.url
)
elif self.session_token:
logger.info('Session auth for IdentityDomainsClient')
self.config = config.from_file(profile_name=self.session_token)
token_file = self.config['security_token_file']
token = None
with open(token_file) as f:
token = f.read()
private_key = load_private_key_from_file(self.config['key_file'])
self.signer = SecurityTokenSigner(token, private_key)
domain_client = IdentityDomainsClient(
{'region': self.config['region']}, signer=self.signer, service_endpoint=domain.url
)
self.tenancy_ocid = self.config['tenancy']
logger.info('Success session auth')
else:
domain_client = IdentityDomainsClient(config=self.config, service_endpoint=domain.url)
self.domain_clients[domain.id] = domain_client
# --- Orchestrate loading of Dynamic Groups, Groups, and Users with comments, timing, and logging ---
# Use log level per settings for timing (critical if "Log All Timings" enabled, info otherwise)
log_critical = False
try:
if self.settings and isinstance(self.settings, dict):
log_critical = self.settings.get('always_log_api_calls', False)
except Exception:
pass
timing_logger = logger.critical if log_critical else logger.info
# Fetch and aggregate Dynamic Groups
t0 = time.perf_counter()
dg_list = self._fetch_dynamic_groups_for_domain(domain, domain_client)
elapsed = time.perf_counter() - t0
timing_logger(
f'[API] _fetch_dynamic_groups_for_domain got {len(dg_list)} dynamic groups for {domain.display_name} completed in {elapsed:.2f}s'
)
# Dynamic groups have already been incrementally appended in _fetch_dynamic_groups_for_domain
# Fetch and aggregate Groups
t0 = time.perf_counter()
group_list = self._fetch_groups_for_domain(domain, domain_client)
elapsed = time.perf_counter() - t0
timing_logger(
f'[API] _fetch_groups_for_domain got {len(group_list)} groups for {domain.display_name} completed in {elapsed:.2f}s'
)
# Groups are already appended incrementally
# Fetch and aggregate Users (only if enabled)
if load_all_users:
t0 = time.perf_counter()
user_list = self._fetch_users_for_domain(domain, domain_client)
elapsed = time.perf_counter() - t0
timing_logger(
f'[API] _fetch_users_for_domain got {len(user_list)} users for {domain.display_name} completed in {elapsed:.2f}s'
)
# Users are already appended incrementally
else:
self.users = []
self.data_as_of = str(datetime.now(UTC))
except Exception as e:
logger.error(f'Failed to load groups/users for domain {domain.id}: {e}')
raise
logger.info(
f'Loaded {len(self.groups)} groups, {len(self.users)} users, {len(self.dynamic_groups)} dynamic groups across all domains'
)
# Set this so that callback can stop any waiting
self.identity_loaded_from_tenancy = True
return True
except Exception as e:
logger.error(f'Failed to load identity domains: {e}')
# return False
raise e
def _enrich_compartments_with_statement_counts(self):
"""
For each compartment, assign:
- statement_count_direct: # of policy statements defined directly in this compartment.
- statement_count_cumulative: cumulative total including ancestors.
"""
# Build direct count for each compartment by OCID using up-to-date self.regular_statements
statements = getattr(self, 'regular_statements', []) or []
direct_statement_count = {}
for st in statements:
coid = st.get('compartment_ocid')
if not coid:
continue
direct_statement_count[coid] = direct_statement_count.get(coid, 0) + 1
# Assign direct count
for comp in self.compartments or []:
comp_id = comp.get('id')
comp['statement_count_direct'] = direct_statement_count.get(comp_id, 0)
# Now cumulative (for each compartment, sum direct count for self and all ancestors)
comp_by_id = {c.get('id'): c for c in self.compartments or []}
for comp in self.compartments or []:
cumulative = 0
c = comp
visited = set()
while c:
cid = c.get('id')
if cid in visited or not cid:
break
cumulative += direct_statement_count.get(cid, 0)
visited.add(cid)
pid = c.get('parent_id')
if not pid or pid == cid or pid not in comp_by_id:
break
c = comp_by_id[pid]
comp['statement_count_cumulative'] = cumulative
# --- Main Filtering Functions ---
# Filtering logic - return a list of policy statements matching given filter
# Single policy filter function that resolves fuzzy search if provided, exact search if provided, and then other criteria if provided
# If multiple criteria are provided, they are ANDed together
# If multiple values are provided for a single criteria, they are ORed together
# If no criteria are provided, return all policy statements
# If no policy statements exist, return empty list
# Fuzzy and Exact search are mutually exclusive - if both are provided, fuzzy search is used
# If Identity Domains are not loaded and either fuzzy or exact search is requested, raise an error
[docs]
def filter_policy_statements(self, filters: PolicySearch) -> list[RegularPolicyStatement]: # noqa: C901
"""
Filter policy statements by one or more criteria.
Args:
filters (PolicySearch): Dictionary of filter keys and their values (e.g. verb, resource, permission, group, etc).
Returns:
list[PolicyStatement]: List of statements matching the filter.
"""
logger.debug(f'Filtering policy statements with criteria: {filters}')
# If fuzzy or exact search is requested, identity domains must be loaded. If not, raise an error
# Previously, filtering by group/user/dynamic-group required identity_domains_loaded.
# This check and logic has been removed per requirements; filtering will proceed regardless.
# If fuzzy search is provided, use it and ignore exact search.
self._resolve_fuzzy_search(filters=filters)
# If exact users were provided for filtering, resolve them to domain/name tuples
self._resolve_exact_users(filters=filters)
# At this point we have exact groups or exact dynamic groups to deal with
logger.debug(f'Post-fuzzy/exact search filters: {filters}')
# Apply regular search - AND all provided fields except fuzzy search
results = []
for stmt in self.regular_statements:
match = True
for key, values in filters.items():
if key == 'exact_groups':
# Get the groups from the exact filter
logger.debug(f'Filtering on exact_groups with values: {values}')
groups_filter = filters.get('exact_groups', None)
# Only applies to statements where "subject_type" == "group"
if stmt.get('subject_type') != 'group':
logger.debug(f"Rejecting {stmt.get('policy_name')} due to subject_type not 'group'")
match = False
break
subjects = stmt.get('subject', [])
if not isinstance(subjects, list):
logger.warning(f'Unexpected Subject format in statement {stmt.get("policy_name")}: {subjects}')
match = False
break
if len(groups_filter) == 0:
logger.debug('No groups in exact_groups filter, thus no match possible')
match = False
break
# A match occurs if any provided domain and group name combo matches any subject in the statement (case-insensitive)
subj_matched = False
for subj_domain, subj_name in subjects:
# Now we need to iterate the provided groups and see if any match
for group in groups_filter:
group_domain = group.get('domain_name') or 'default'
group_name = group.get('group_name')
if (
subj_domain.casefold() == group_domain.casefold()
and subj_name.casefold() == group_name.casefold()
):
logger.debug(
f'Matched group {subj_domain}/{subj_name} in statement {stmt.get("policy_name")} to filter group {group_domain}/{group_name}'
)
subj_matched = True
if not subj_matched:
logger.debug(
f'No match found for exact_group filter in statement {stmt.get("policy_name")} Text: {stmt.get("statement_text")} Statement: {stmt.get("subject")}'
)
match = False # If we get here, no match found
break
# For exact dynamic group, similar logic
elif key == 'exact_dynamic_groups' and values:
logger.debug(f'Filtering on exact_dynamic_groups with values: {values}')
dyn_groups_filter = filters.get('exact_dynamic_groups', [])
if stmt.get('subject_type') != 'dynamic-group':
logger.debug(f"Rejecting {stmt.get('policy_name')} due to Subject Type not 'dynamic-group'")
match = False
break
subjects = stmt.get('subject', [])
if not isinstance(subjects, list):
logger.warning(f'Unexpected Subject format in statement {stmt.get("policy_name")}: {subjects}')
match = False
break
subj_matched = False
for subj_domain, subj_name in subjects:
for dg in dyn_groups_filter:
dg_domain = dg.get('domain_name') or 'default'
dg_name = dg.get('dynamic_group_name')
if (
subj_domain.casefold() == dg_domain.casefold()
and subj_name.casefold() == dg_name.casefold()
):
logger.debug(
f'Matched dynamic group {subj_domain}/{subj_name} in statement {stmt.get("policy_name")} to filter group {dg_domain}/{dg_name}'
)
subj_matched = True
if not subj_matched:
logger.debug(
f'No match found for exact_dynamic_groups filter in statement {stmt.get("policy_name")} Text: {stmt.get("statement_text")} Statement: {stmt.get("subject")}'
)
match = False # If we get here, no match found
break
# Compartment special: ROOTONLY
elif key == 'compartment_path' and 'ROOTONLY' in values:
if stmt.get('compartment_ocid') != self.tenancy_ocid:
logger.debug(f'Rejecting {stmt.get("policy_name")} due to ROOTONLY restriction')
match = False
break
elif key == 'location' and 'tenancy' in values:
if stmt.get('location_type', '').casefold() != 'tenancy':
logger.debug(f'Rejecting {stmt.get("policy_name")} due to location not tenancy')
match = False
break
# Once domain cases are done, iterate remaining values
# Verb enum
elif key == 'verb':
invalid = set(values) - VALID_VERBS
if invalid:
logger.debug(f'Invalid verbs in filter: {invalid}')
field_value = str(stmt.get('verb', '')).lower()
if field_value not in values:
logger.debug(f'Rejecting {stmt.get("policy_name")} due to verb mismatch: {field_value}')
match = False
break
# Validity check
elif key == 'valid':
valid_value = values
statement_valid_value = stmt.get('valid', False)
logger.debug(f'Filtering on validity: {valid_value} vs {statement_valid_value}')
if valid_value != statement_valid_value:
logger.debug(f'Rejecting {stmt.get("policy_name")} due to validity mismatch')
match = False
break
# Effective path search
elif key == 'effective_path':
filter_eff_value = values[0].lower()
statement_eff_value = str(stmt.get('effective_path', '')).lower()
logger.debug(f'Filtering on filt/st {filter_eff_value} vs {statement_eff_value}')
# Logic here - if the effective path given contains the effective path of the statement,
# then it is a match. This allows searching for all policies effective in a given compartment and its children.
if not (filter_eff_value.startswith(statement_eff_value)):
logger.debug(
f'Rejecting {stmt.get("policy_name")} due to effective_path mismatch: '
f'{statement_eff_value} not in {filter_eff_value}'
)
match = False
break
# Default lookup using column map
else:
column = key
logger.debug(f'Filtering on {key} mapped to column {column} with values {values}')
if not column or not values:
logger.debug(f'Unknown filter key: {key} or values empty, skipping')
continue
field_value = str(stmt.get(column, '')).lower()
if not any(val.lower() in field_value for val in values):
logger.debug(f'Rejecting {stmt.get("policy_name")} due to {key} mismatch')
match = False
break
if match:
results.append(stmt)
logger.info(f'Filter applied. {len(results)} matched out of {len(self.regular_statements)} Regular statements.')
return results
[docs]
def filter_cross_tenancy_policy_statements(self, alias_filter: list[str]) -> list[RegularPolicyStatement]:
"""
Filter cross-tenancy policy statements containing any provided alias.
Args:
alias_filter (list[str]): List of aliases to look for in statement text.
Returns:
list[PolicyStatement]: Filtered cross-tenancy policy statements.
"""
filtered = []
for statement in self.cross_tenancy_statements:
for alias_to_check in alias_filter:
# Check each alias to see if in statement text
statement_text = statement.get('statement_text', '')
if alias_to_check in statement_text:
logger.debug(f'Adding statement (alias={alias_to_check}): {statement_text}')
filtered.append(statement)
logger.info(f'Returning {len(filtered)} Cross-Tenancy Results')
return filtered
# -- Identity Domain Related Filtering Functions ---
[docs]
def get_users_for_group(self, group: Group) -> list[User]:
"""
Return all users that belong to the specified exact group. Membership is determined by matching the group name and domain name.
Args:
group (Group): A dictionary with keys:
- 'domain': str | None
- 'name': str
Returns:
list[User]: A list of Users that belong to the specified group. If the group does not exist or has no members, returns an empty list.
"""
group_domain = group.get('domain_name') or 'default'
group_name = group['group_name']
logger.debug(f'Number of groups: {len(self.groups)} Number of users: {len(self.users)}')
# Get GID (as it is used by users)
group_ocid = None
for g in self.groups:
if (
g.get('group_name', '').casefold() == group_name.casefold()
and g.get('domain_name', '').casefold() == group_domain.casefold()
):
group_ocid = g.get('group_ocid')
break
if not group_ocid:
logger.warning(f'Group not found: {group_domain}/{group_name}')
return []
logger.debug(f'Group OCID: {group_ocid}')
# now iterate users and see if any have that OCID in their groups field
matched_users = [u for u in self.users if group_ocid in u.get('groups', [])]
logger.info(f'Found {len(matched_users)} users for group {group_domain}/{group_name}')
return matched_users
[docs]
def get_groups_for_user(self, user: User) -> list[Group]:
"""Return the list of all Groups that a user is a member of
Args:
user (User): The user to find groups for.
Returns:
list[Group]: A list of Groups that the user is a member of.
"""
groups_for_user: list[Group] = []
logger.info(f'User to filter: {user}')
logger.debug(f'Users: {self.users}')
# Iterate through users to find our user
for u in self.users:
# Match the tuple
if (
u.get('user_name', '').casefold() == user.get('user_name').casefold()
and u.get('domain_name', 'default').casefold() == user.get('domain_name', 'default').casefold()
):
logger.debug(f'User found. Groups: {u.get("groups")}')
# hold that thought...
for user_group_ocid in u.get('groups', []):
# Find the Group OCID in the groups and append
for g in self.groups:
if g.get('group_ocid') == user_group_ocid:
# Now append as tuple
groups_for_user.append(g)
logger.debug(f'Adding Group {g.get("domain_name")} / {g.get("group_name")} ')
logger.info(f'Found {len(groups_for_user)} groups for user {user.get("domain_name")} / {user.get("user_name")}')
return groups_for_user
def _user_search_internal(self, user_filter: UserSearch) -> list[User]:
"""
Search for users based on the provided filter.
Using the internal names in the User object
"""
logger.info(f'User filter to check: {user_filter}')
users_return: list[User] = []
for u in self.users:
# for uu in user_filter:
matches_domain = not user_filter.get('domain_name') or any(
term.lower() in str(u.get('domain_name')).lower() for term in user_filter.get('domain_name')
)
matches_username = not user_filter.get('search') or any(
term.lower() in str(u.get('user_name')).lower() for term in user_filter.get('search')
)
matches_display = not user_filter.get('search') or any(
term.lower() in str(u.get('display_name')).lower() for term in user_filter.get('search')
)
matches_ocid = not user_filter.get('user_ocid') or any(
term.lower() in str(u.get('user_ocid')).lower() for term in user_filter.get('user_ocid')
)
# If any match (OR), then get groups and add to exact match
if matches_domain and (matches_username or matches_display) and matches_ocid:
# get groups for user
logger.debug(f'Found a user match: {u} / {user_filter}')
users_return.append(u)
logger.info(f'User Search got {len(users_return)} users')
return users_return
def _group_search_internal(self, group_filter: GroupSearch) -> list[Group]:
"""
Search for groups based on the provided filter.
Using the internal names in the User object
"""
logger.info(f'Group filter to check: {group_filter}')
groups_return: list[Group] = []
for g in self.groups:
matches_name = not group_filter.get('group_name') or any(
term.lower() in str(g.get('group_name')).lower() for term in group_filter.get('group_name')
)
matches_domain = not group_filter.get('domain_name') or any(
term.lower() in str(g.get('domain_name')).lower()
for term in group_filter.get('domain_name', ['default'])
)
matches_ocid = not group_filter.get('group_ocid') or any(
term.lower() in str(g.get('group_ocid')).lower() for term in group_filter.get('group_ocid')
)
if matches_name and matches_domain and matches_ocid:
groups_return.append(g)
logger.info(f'Group Search returning {len(groups_return)} groups')
return groups_return
def _dynamic_group_search_internal(self, dg_filter: DynamicGroupSearch) -> list[DynamicGroup]:
"""Search for dynamic groups based on the provided filter."""
logger.info(f'Dynamic Group filter to check: {dg_filter}')
dgs_return: list[DynamicGroup] = []
for dg in self.dynamic_groups:
matches_name = not dg_filter.get('dynamic_group_name') or any(
term in str(dg.get('dynamic_group_name')).lower() for term in dg_filter.get('dynamic_group_name')
)
matches_domain = not dg_filter.get('domain_name') or any(
term in str(dg.get('domain_name')).lower() for term in dg_filter.get('domain_name', ['default'])
)
matches_ocid = not dg_filter.get('dynamic_group_ocid') or any(
term in str(dg.get('dynamic_group_ocid')).lower() for term in dg_filter.get('dynamic_group_ocid')
)
matches_rule = not dg_filter.get('matching_rule') or any(
term in str(dg.get('matching_rule')).lower() for term in dg_filter.get('matching_rule')
)
matches_description = not dg_filter.get('description') or any(
term in str(dg.get('description')).lower() for term in dg_filter.get('description')
)
if matches_name and matches_domain and matches_ocid and matches_rule and matches_description:
dgs_return.append(
{
'domain_name': dg.get('domain_name'),
'dynamic_group_name': dg.get('dynamic_group_name'),
'dynamic_group_ocid': dg.get('dynamic_group_ocid'),
}
)
logger.info(f'Dynamic Group Search returning {len(dgs_return)} dynamic groups')
return dgs_return
def _resolve_fuzzy_search(self, filters: PolicySearch): # noqa: C901
"""Look for fuzzy search and turn it into an exact search"""
logger.debug(f'Resolve fuzzy Groups: {filters.get("search_groups")}')
logger.debug(f'Resolve fuzzy Users: {filters.get("search_users")}')
logger.debug(f'Resolve fuzzy DG: {filters.get("search_dynamic_groups")}')
# First do fuzzy user search
if filters.get('search_users'):
user_filter: UserSearch = filters.get('search_users')
logger.info(f'User filter to check: {user_filter}')
filtered_users = self._user_search_internal(user_filter)
logger.info(f'User search returned {len(filtered_users)} users')
# Now, for each user, get their groups and add to exact groups
exact_groups: list[Group] = []
for u in filtered_users:
user_groups: list[Group] = self.get_groups_for_user(u)
exact_groups.extend(user_groups)
# De-dup exact groups
seen = set()
deduplicated_list = []
for group in exact_groups:
identifier = (group.get('domain_name') or 'Default', group.get('group_name'))
if identifier not in seen:
seen.add(identifier)
deduplicated_list.append(group)
exact_groups = deduplicated_list
# Set exact groups into filter that was passed in
filters['exact_groups'] = exact_groups
del filters['search_users']
logger.info(f'Added {len(exact_groups)} exact groups to filter (removed fuzzy user search)')
# Next, fuzzy group search
elif filters.get('search_group'):
group_filter: GroupSearch = filters.get('search_groups')
exact_groups: list[Group] = self._group_search_internal(group_filter)
# De-dup exact groups
seen = set()
deduplicated_list = []
for group in exact_groups:
identifier = (group.get('domain_name') or 'Default', group.get('group_name'))
if identifier not in seen:
seen.add(identifier)
deduplicated_list.append(group)
exact_groups = deduplicated_list
# Set exact groups into filter that was passed in
filters['exact_groups'] = exact_groups
# remove the fuzzy search
del filters['search_groups']
logger.info(f'Added {len(exact_groups)} exact groups to filter')
# Finally, fuzzy dynamic group search
elif filters.get('search_dynamic_groups'):
dg_filter: DynamicGroupSearch = filters.get('search_dynamic_groups')
exact_dgs: list[DynamicGroup] = self._dynamic_group_search_internal(dg_filter)
# Set exact DGs into filter that was passed in
filters['exact_dynamic_groups'] = exact_dgs
# Remove fuzzy search
del filters['search_dynamic_groups']
logger.info(f'Added {len(exact_dgs)} exact dynamic groups to filter (removed fuzzy dynamic group search)')
else:
logger.debug('No fuzzy logic executed, search not changed.')
def _resolve_exact_users(self, filters: PolicySearch):
"""Look for exact users and turn them into groups"""
if not filters.get('exact_users'):
return
user_filter: list[User] = filters.get('exact_users')
logger.info(f'Exact User filter to check: {user_filter}')
# Start with no groups and iterate users
exact_groups: list[Group] = []
for u in self.users:
# We need an exact match on domain and username
user_domain = u.get('domain_name') or 'default'
user_name = u.get('user_name')
for filter_user in user_filter:
filter_domain = filter_user.get('domain_name') or 'default'
filter_name = filter_user.get('user_name')
logger.debug(
f'Checking actual user {user_domain}/{user_name} against filter user {filter_domain}/{filter_name}'
)
if (
filter_domain.casefold() == user_domain.casefold()
and filter_name.casefold() == user_name.casefold()
):
# get groups for user
logger.debug(f'Exact user match found: {user_domain}/{user_name}')
uu: User = {'domain_name': user_domain, 'user_name': user_name} # type: ignore
user_groups: list[Group] = self.get_groups_for_user(uu)
logger.debug(f'User groups: {user_groups}')
# add groups into exact match in filter
exact_groups.extend(user_groups)
# De-dup exact groups
seen = set()
deduplicated_list = []
for group in exact_groups:
identifier = (group.get('domain_name') or 'Default', group.get('group_name'))
if identifier not in seen:
seen.add(identifier)
deduplicated_list.append(group)
exact_groups = deduplicated_list
# Set exact groups into filter that was passed in
filters['exact_groups'] = exact_groups
del filters['exact_users']
logger.info(f'Exact User Search {len(exact_groups)} exact groups to filter (removed exact user search)')
[docs]
def filter_groups(self, group_filter: GroupSearch) -> list[Group]:
"""Filter groups based on the provided filter. Public function used by MCP or UI"""
filtered = []
logger.info(f'Filtering Groups based on: {group_filter}')
filtered: list[Group] = self._group_search_internal(group_filter)
logger.info(f'Filtered to {len(filtered)} groups')
return filtered
[docs]
def filter_users(self, user_filter: UserSearch) -> list[User]:
"""
Filter users based on the provided filter.
This function is used by the MCP interface and the UI.
Args:
user_filter (UserSearch):
A dictionary with optional keys.
* ``domain_name`` (list[str]): Domain names to filter by (case-insensitive).
* ``search`` (list[str]): Search terms to match against usernames and display names (case-insensitive).
* ``user_ocid`` (list[str]): User OCIDs to filter by (case-insensitive).
Returns:
list[User]:
Users that match the filter criteria.
Each :class:`User` is represented as a dictionary with keys:
* ``domain_name`` (str | None): Domain name of the user.
* ``user_name`` (str): Username.
* ``user_ocid`` (str): OCID of the user.
* ``display_name`` (str): Display name of the user.
* ``email`` (str): Email of the user.
* ``user_id`` (str): Internal ID of the user.
* ``groups`` (list[str]): Group OCIDs the user belongs to.
"""
logger.info(f'Filtering Users (public) based on: {user_filter}')
filtered_users: list[User] = self._user_search_internal(user_filter)
logger.info(f'Filtered to {len(filtered_users)} users')
for u in filtered_users:
logger.debug(f'User: {u.get("domain_name")}/{u.get("user_name")} Name:"{u.get("display_name")}"')
return filtered_users
[docs]
def filter_dynamic_groups(self, filters: DynamicGroupSearch) -> list[DynamicGroup]:
"""
Filter dynamic groups using JSON-based filters.
Args:
filters (DynamicGroupSearch): A mapping of filter keys to one or more values.
- **OR**: multiple values within a field act as logical OR.
- **AND**: multiple fields are combined as logical AND.
**Supported keys:**
* ``domain_name`` → matches "Domain"
* ``dynamic_group_name`` → matches "DG Name"
* ``matching_rule`` → matches "Matching Rule"
* ``dynamic_group_ocid`` → matches "DG OCID"
* ``in_use`` → matches "In Use" (True/False)
Returns:
list[DynamicGroup]: A list of dynamic groups that satisfy the filters.
Each dynamic group is represented as a dictionary with keys:
* ``domain_name`` (str | None): The domain name of the dynamic group.
* ``dynamic_group_name`` (str): The name of the dynamic group.
* ``dynamic_group_id`` (str): The ID of the dynamic group.
* ``dynamic_group_ocid`` (str): The OCID of the dynamic group.
* ``matching_rule`` (str): The matching rule of the dynamic group.
* ``description`` (str): The description of the dynamic group.
* ``in_use`` (bool): Whether the dynamic group is in use.
* ``creation_time`` (str): The creation timestamp of the dynamic group.
* ``created_by_name`` (str): The name of the user who created the dynamic group.
* ``created_by_ocid`` (str): The OCID of the user who created the dynamic group.
Raises:
ValueError: If an unknown filter key is provided.
"""
results = []
# Only INFO if non-empty or filters indicate stateful/intentional request, else DEBUG
if self.dynamic_groups or filters:
logger.info(f'Filtering Dynamic Groups based on: {filters}')
else:
logger.debug(f'Filtering Dynamic Groups based on: {filters} (no data loaded yet)')
for dg in self.dynamic_groups:
match = True
for key, values in filters.items():
# Check in-use first because it is special
if key == 'in_use':
if not values and not dg.get('in_use', False):
logger.debug(
f'DG included {dg.get("dynamic_group_name")} due to in_use match: {dg.get("in_use")} = {values}'
)
continue
else:
logger.debug(
f'DG rejected {dg.get("dynamic_group_name")} in_use: {dg.get("in_use")} != {values}'
)
match = False
break
elif not values:
logger.debug(f'Skipping empty filter for key: {key}')
continue
else:
values = [v.lower() for v in values]
logger.debug(f'Filtering on {key} mapped to column {key} with values {values}')
field_value = str(dg.get(key, '')).lower()
logger.debug(f'Field value for {key}: {field_value}')
if not any(val.lower() in field_value for val in values):
logger.debug(f'Rejecting DG {dg.get("DG Name")} due to {key} mismatch')
match = False
break
if match:
results.append(dg)
if self.dynamic_groups or filters:
logger.info(f'Filter applied. {len(results)} matched out of {len(self.dynamic_groups)} Dynamic Groups.')
else:
logger.debug(f'Filter applied. {len(results)} matched out of 0 Dynamic Groups (pre-load state)')
return results
# --- Other Public Functions ---
# Not in use
def _check_history(self, policy_ocid: str, start_time: str) -> None:
"""Look at audit logs to track changes to a policy"""
the_log = f'{self.tenancy_ocid}/_Audit'
logs_returned = self._api_call_with_logging(
'LogSearchClient.search_logs',
self.logging_search_client.search_logs,
search_logs_details=SearchLogsDetails(
search_query=f"search \"{the_log}\" | (type in ('com.oraclecloud.identityControlPlane.UpdatePolicy','com.oraclecloud.identityControlPlane.CreatePolicy','com.oraclecloud.identityControlPlane.DeletePolicy')) | sort by datetime desc",
# search_query=f'search \"{the_log}\" where type=\'com.oraclecloud.identityControlPlane.UpdatePolicy\'',
time_start='2025-07-10T11:59:00Z',
time_end='2025-07-23T23:59:00Z',
),
limit=1000,
)
if logs_returned and logs_returned.data and logs_returned.data.results:
logger.info(f'Found {len(logs_returned.data.results)} logs for policy updates in the last 24 hours')
for log in logs_returned.data.results:
res: SearchResult = log
if res and res.data:
type_of_log = res.data.get('logContent').get('type')
change_curr = (
res.data.get('logContent').get('data').get('stateChange').get('current').get('statements')
)
change_prev = None
if (
res.data.get('logContent').get('data')
and res.data.get('logContent').get('data').get('stateChange')
and res.data.get('logContent').get('data').get('stateChange').get('previous')
):
# Previous state change exists
change_prev = (
res.data.get('logContent').get('data').get('stateChange').get('previous').get('statements')
)
logger.info(f'Log Type: {type_of_log}')
logger.info(f'***Log Details: Type: {type_of_log}Previous:{change_prev} Current:{change_curr}')
else:
logger.info('No policy update logs found in the last 24 hours')
pass
def _get_domains(self) -> list:
return [{'id': d.id, 'display_name': d.display_name, 'url': d.url} for d in self.identity_domains]
# --- Compliance Output Loading ---
# Because we are not using OCI clients here, we need to load from CSV files
# We need to load in this order:
# 1. Domains
# 2. Dynamic Groups
# 3. Users
# 3a. Augment users with group membership
# 4. Groups + Membership
# 5. Compartments
# 5a. Augment compartment data with path strings (cannot use client here)
# 6. Policies
def _get_domain_name_from_ocid(self, domain_ocid: str) -> str:
"""Given a domain OCID, return the domain name from loaded domains"""
if not domain_ocid or domain_ocid == '':
return 'Default'
for domain in self.identity_domains:
if domain.get('id') == domain_ocid:
return domain.get('display_name', 'Default')
return 'Default'
def _get_hierarchy_path_for_compartment(self, compartment, comp_string: str) -> str:
"""Given a compartment JSON dict, return the full hierarchy path as a string"""
# If OCID is the tenancy OCID, return ROOT
if compartment.get('id') == self.tenancy_ocid:
return 'ROOT'
path_parts = []
current_comp = compartment
while current_comp:
path_parts.append(current_comp.get('name', 'Unknown'))
parent_id = current_comp.get('parent_id')
if not parent_id or parent_id == current_comp.get('id'):
break
# Find parent compartment in loaded compartments
parent_comp = next((comp for comp in self.compartments if comp.get('id') == parent_id), None)
current_comp = parent_comp
# Reverse the path parts to get from root to leaf
path_parts.reverse()
full_path = '/'.join(path_parts)
logger.debug(f'Compartment {comp_string} full path: {full_path}')
return full_path
[docs]
def load_from_compliance_output_dir(self, dir_path: str, load_all_users: bool = True) -> bool: # noqa: C901
"""
Load all compartments, domains, groups, users, dynamic groups, and policies from compliance tool output files.
Always resets the reload time (`policy_data_reloaded`) so that reload is not shown for compliance/CSV data.
Starts with domains, then dynamic groups, then users/groups/membership, then compartments, then policies.
This function is for offline/compliance output analysis: no attempt to initialize any OCI client.
Args:
dir_path (str): Path to a directory containing the expected compliance output files.
load_all_users (bool): If False, skip loading users. Default is True.
Returns:
bool: True if all files parsed and data loaded successfully, False otherwise.
"""
# Explicit: always clear reload time before compliance/CSV load.
self.policy_data_reloaded = None
logger.info(f'Loading compliance data from output dir: {dir_path}')
# Optional pre-step: special case for compliance domains CSV.
# In the raw_data_identity_domains.csv export, the tenancy OCID is represented
# as the compartment_id of the row whose display_name is "Default Domain".
# When present, we use that compartment_id to seed self.tenancy_ocid so that
# downstream usage/limits logic (including usage tracking) has a correct
# tenancy OCID even if the compartments CSV is incomplete.
domains_csv_path = os.path.join(dir_path, 'raw_data_identity_domains.csv')
if os.path.exists(domains_csv_path):
try:
with open(domains_csv_path, encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
display_name = (row.get('display_name') or '').strip()
if display_name.lower() == 'default domain':
default_domain_compartment_id = (row.get('compartment_id') or '').strip()
if default_domain_compartment_id:
# SPECIAL CASE: in compliance output, the tenancy OCID is the
# compartment_id of the Default Domain row.
self.tenancy_ocid = default_domain_compartment_id
logger.info(
'Set tenancy_ocid from raw_data_identity_domains.csv Default Domain compartment_id: %s',
self.tenancy_ocid,
)
break
except Exception as e:
logger.error(f'Failed to read tenancy_ocid from raw_data_identity_domains.csv: {e}')
# We need to only use the CSV files and stop using the JSON file altogether
try:
# Step 1: Set the tenancy OCID and Name from the data
with open(os.path.join(dir_path, 'raw_data_identity_compartments.csv'), encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
if row.get('id', '').startswith('ocid1.tenancy.'):
# If tenancy_ocid was already set from domains CSV, keep it;
# otherwise, use the value from compartments.
if not self.tenancy_ocid:
self.tenancy_ocid = row.get('id', '')
self.tenancy_name = row.get('name', '')
logger.info(
'Set tenancy OCID to %s and name to %s (compartments CSV)',
self.tenancy_ocid,
self.tenancy_name,
)
break
if not self.tenancy_ocid or not self.tenancy_name:
logger.error('Could not find tenancy OCID and name in compartments CSV')
return False
# --- Step 2: Load Dynamic Groups ---
dgs_file = os.path.join(dir_path, 'raw_data_identity_dynamic_groups.csv')
with open(dgs_file, encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
created_by = row.get('idcs_created_by', '{}')
try:
created_by_json = json.loads(created_by)
created_by_ocid = created_by_json.get('odid', 'n/a')
except json.JSONDecodeError:
created_by_ocid = 'n/a'
domain_ocid = row.get('domain_ocid', '')
domain_name = self._get_domain_name_from_ocid(domain_ocid)
dg: DynamicGroup = {
'domain_name': domain_name or 'Default',
'dynamic_group_name': row.get('display_name') or '',
'dynamic_group_id': 'n/a',
'dynamic_group_ocid': row.get('ocid', ''),
'matching_rule': row.get('matching_rule', ''),
'description': row.get('description') or '',
'in_use': True, # Default to True; will be updated later
'creation_time': 'n/a',
'created_by_name': 'n/a',
'created_by_ocid': created_by_ocid,
}
self.dynamic_groups.append(dg)
logger.info(f'Loaded {len(self.dynamic_groups)} dynamic groups from CSV')
# --- Step 3: Load Groups ---
groups_file = os.path.join(dir_path, 'raw_data_identity_groups_and_membership.csv')
user_membership: dict[str, list[str]] = {}
user_domains: dict[str, str] = {}
seen_groups = set()
with open(groups_file, encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
group: Group = {
'domain_name': row.get('domain_deeplink', '').split('","')[-1].rstrip('")')
if 'domain_deeplink' in row
else 'Default',
'group_name': row.get('name') or '',
'group_ocid': row.get('id') or '',
'description': row.get('description') or '',
'group_id': row.get('id') or '',
}
logger.debug(f'Processing group: {group}')
member_user_ocid = row.get('user_id', '')
if member_user_ocid and member_user_ocid != '':
if member_user_ocid not in user_membership:
user_membership[member_user_ocid] = []
user_membership[member_user_ocid].append(row.get('id'))
group_key = (group['domain_name'], group['group_name'])
if member_user_ocid and member_user_ocid != '':
user_domains[member_user_ocid] = group['domain_name']
if group_key in seen_groups:
continue
seen_groups.add(group_key)
self.groups.append(group)
logger.debug(f'Loaded {len(self.groups)} groups')
# --- Step 4: Load Users, unless disabled ---
self.users = []
if load_all_users:
users_file = os.path.join(dir_path, 'raw_data_identity_users.csv')
with open(users_file, encoding='utf-8') as f:
reader = csv.DictReader(f)
for user_item in reader:
logger.debug(f'Processing user item: {user_item}')
user: User = {
'domain_name': user_item.get('domain_deeplink', '').split('","')[-1].rstrip('")')
if 'domain_deeplink' in user_item
else 'Default',
'user_name': user_item.get('name') or '', # No way to get username or email
'user_ocid': user_item.get('id') or '',
'display_name': user_item.get('name') or '',
'email': user_item.get('email') or '',
'user_id': user_item.get('external_identifier') or '',
'groups': [],
}
group_names_str = user_item.get('groups', '') or ''
group_names = eval(group_names_str) if group_names_str else []
group_ocids = []
for group_name in group_names:
group_obj = next(
(
g
for g in self.groups
if g.get('group_name') == group_name
and g.get('domain_name') == user.get('domain_name')
),
None,
)
if group_obj:
group_ocids.append(group_obj.get('group_ocid', ''))
user['groups'] = group_ocids
logger.info(f'Loaded user: {user}')
self.users.append(user)
logger.info(f'Loaded {len(self.users)} users')
else:
logger.info('Skipping load of users due to load_all_users=False')
# -- Step 5: Load Compartments ---
compartments_file = os.path.join(dir_path, 'raw_data_identity_compartments.csv')
with open(compartments_file, encoding='utf-8') as f:
reader = csv.DictReader(f)
# Iterate compartments and add to list
for comp_item in reader:
compartment = {
'id': comp_item.get('id') or '',
'name': comp_item.get('name') or '',
'hierarchy_path': None, # will be built later
'lifecycle_state': comp_item.get('lifecycle_state') or '',
'parent_id': comp_item.get('compartment_id') or '',
'description': comp_item.get('description') or '',
}
logger.debug(f'Processing compartment: {compartment}')
# Only add ACTIVE compartments
if compartment['lifecycle_state'] == 'ACTIVE':
self.compartments.append(compartment)
else:
logger.debug(
f"Skipping compartment {compartment['name']} with lifecycle state {compartment['lifecycle_state']}"
)
# For some reason the root compartment is not included - add it manually
root_compartment = Compartment(
id=self.tenancy_ocid,
name='ROOT',
parent_id='',
hierarchy_path='',
description='',
lifecycle_state='ACTIVE',
)
self.compartments.append(root_compartment)
logger.debug(f'Loaded {len(self.compartments)} compartments')
# Now build hierarchy paths for each compartment
for comp in self.compartments:
logger.info(f"Building path for compartment {comp.get('name','n/a')}")
comp['hierarchy_path'] = self._get_hierarchy_path_for_compartment(comp, '')
logger.info('Built hierarchy paths for compartments')
# Debug just the compartment name and path for all compartments
for comp in self.compartments:
logger.info(f"Compartment: {comp.get('name','n/a')} Path: {comp.get('hierarchy_path','n/a')}")
# --- Step 6: Load Policies ---
compartments_file = os.path.join(dir_path, 'raw_data_identity_policies.csv')
with open(compartments_file, encoding='utf-8') as f:
reader = csv.DictReader(f)
for policy_item in reader:
# Create a Policy object for the Policy itself
# Try to extract tags from CSV: expects column "tags" as a JSON or stringified dict (optional)
tags = None
if 'tags' in policy_item:
tags_str = policy_item.get('tags') or ''
if tags_str:
try:
tags_candidate = eval(tags_str) if tags_str.startswith('{') else tags_str
if isinstance(tags_candidate, dict):
tags = tags_candidate
except Exception:
pass
comp_path = next(
(
comp['hierarchy_path']
for comp in self.compartments
if comp['id'] == policy_item.get('compartment_id')
),
'ROOT',
)
policy_obj = BasePolicy(
policy_name=policy_item.get('name') or '',
policy_ocid=policy_item.get('id') or '',
compartment_ocid=policy_item.get('compartment_id') or '',
compartment_path=comp_path,
description=policy_item.get('description') or '',
creation_time='',
tags=tags if isinstance(tags, dict) else {},
freeform_tags=tags if isinstance(tags, dict) else {},
)
logger.debug(f'Processing policy: {policy_obj}')
self.policies.append(policy_obj)
# Get the basic details here and then iterate statements - those are to be added to the list
policy_ocid = policy_item.get('identifier') or ''
comp_id = policy_item.get('compartment_id') or ''
policy_name = policy_item.get('name') or ''
creation_time = policy_item.get('time_created') or ''
# Statements needs to be a list of strings, but appears like a single string in CSV. example:
# ['allow group iam_tag_group to inspect all-resources in tenancy', 'allow group iam_tag_group read instances in tenancy', 'allow group iam_tag_group to read load-balancers in tenancy', 'allow group iam_tag_group to read buckets in tenancy', 'allow group iam_tag_group to read nat-gateways in tenancy', 'allow group iam_tag_group to read public-ips in tenancy', 'allow group iam_tag_group to read file-family in tenancy', 'allow group iam_tag_group to read instance-configurations in tenancy', 'allow group iam_tag_group to read network-security-groups in tenancy', 'allow group iam_tag_group to read capture-filters in tenancy', 'allow group iam_tag_group to read resource-availability in tenancy', 'allow group iam_tag_group to read audit-events in tenancy', 'allow group iam_tag_group to read users in tenancy', 'allow group iam_tag_group to use cloud-shell in tenancy', 'allow group iam_tag_group to read vss-family in tenancy', 'allow group iam_tag_group to read usage-budgets in tenancy', 'allow group iam_tag_group to read usage-reports in tenancy', 'allow group iam_tag_group to read data-safe-family in tenancy', 'allow group iam_tag_group to read vaults in tenancy', 'allow group iam_tag_group to read keys in tenancy', 'allow group iam_tag_group to read tag-namespaces in tenancy', 'allow group Aiam_tag_group to use ons-family in tenancy where any {request.operation!=/Create*/, request.operation!=/Update*/, request.operation!=/Delete*/, request.operation!=/Change*/}']
statements = eval(policy_item.get('statements') or '[]')
logger.debug(f'Policy {policy_name} has {len(statements)} statements')
# Iterate each statement, determine type, and proceed to parse
for statement_text in statements:
# DO NOT lowercase statement text - preserve original case
stripped_statement = statement_text.strip()
base_policy_statement: BasePolicyStatement = BasePolicyStatement(
policy_name=policy_name,
policy_ocid=policy_ocid,
# policy_description=policy_item.get('description') or '',
compartment_ocid=comp_id,
compartment_path=comp_path,
statement_text=stripped_statement,
creation_time=creation_time,
internal_id=hashlib.md5((stripped_statement + '' + policy_ocid).encode()).hexdigest(),
parsed=False,
)
logger.debug(f'Processing statement: {statement_text}')
st_text_lower = stripped_statement.lower()
# Parse the statement now - cannot use the existing parser as is because it relies on OCI clients
if st_text_lower.startswith('define'):
# Parse as DefineStatement
define_statement: DefineStatement = DefineStatement(**base_policy_statement)
if not self._parse_define_statement(policy_obj, define_statement):
logger.debug(f'Define statement was unable to parse: {statement_text}')
logger.debug(f'Parsed define statement: {define_statement}')
# Admit and Deny Admit
elif st_text_lower.startswith('admit') or st_text_lower.startswith('deny admit'):
admit_statement: AdmitStatement = AdmitStatement(**base_policy_statement)
if not self._parse_admit_statement(policy_obj, admit_statement):
logger.debug(f'Admit statement was unable to parse: {statement_text}')
logger.debug(f'Parsed admit statement: {admit_statement}')
# Endorse Statement
elif st_text_lower.startswith('endorse'):
endorse_statement: EndorseStatement = EndorseStatement(**base_policy_statement)
if not self._parse_endorse_statement(policy_obj, endorse_statement):
logger.debug(f'Endorse statement was unable to parse: {statement_text}')
logger.debug(f'Parsed endorse statement: {endorse_statement}')
else:
# Regular Policy Statement
regular_statement: RegularPolicyStatement = RegularPolicyStatement(**base_policy_statement)
parsed_statement_valid = self._parse_statement(policy_obj, regular_statement)
if not parsed_statement_valid:
logger.warning(f'Invalid policy statement detected: {statement_text}')
logger.debug(f'Parsed regular policy statement: {regular_statement}')
logger.info(f'Loaded {len(self.regular_statements)} policy statements')
self.data_as_of = datetime.now(UTC).isoformat()
# For compliance/JSON loads, explicitly clear the reload date unless recovered from cache elsewhere
self.policy_data_reloaded = None
self.loaded_from_compliance_output = True
# After all policy statements loaded, enrich compartment counts
self._enrich_compartments_with_statement_counts()
# logger.warning(f"on_policy_statements_updated callback failed: {e}")
logger.info('Compliance output data loaded successfully.')
return True
except Exception as e:
# Show stack trace for debugging
import traceback
traceback.print_exc()
logger.error(f'Compliance output data load failed: {e}')
return False