##########################################################################
# Copyright (c) 2024, Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#
# DISCLAIMER This is not an official Oracle application, It does not supported by Oracle Support.
#
# data_repo.py
#
# @author: Andrew Gregory
#
# Supports Python 3.12 and above
#
# coding: utf-8
##########################################################################
# Standard library imports
import csv
import hashlib
import json
import os
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import UTC, datetime
from pathlib import Path
# Third-party imports
from oci import config, pagination
from oci.auth.signers import InstancePrincipalsSecurityTokenSigner, SecurityTokenSigner
from oci.exceptions import ConfigFileNotFound
from oci.identity import IdentityClient
from oci.identity.models import Compartment
from oci.identity_domains import IdentityDomainsClient
from oci.identity_domains.models import DynamicResourceGroup
from oci.loggingsearch import LogSearchClient
from oci.loggingsearch.models import SearchLogsDetails, SearchResult
from oci.resource_search import ResourceSearchClient
from oci.resource_search.models import StructuredSearchDetails
from oci.signer import load_private_key_from_file
from oci_policy_analysis.common.logger import get_logger
from oci_policy_analysis.common.models import (
AdmitStatement,
BasePolicy,
BasePolicyStatement,
DefineStatement,
DynamicGroup,
DynamicGroupSearch,
EndorseStatement,
Group,
GroupSearch,
PolicySearch,
RegularPolicyStatement,
User,
UserSearch,
)
from oci_policy_analysis.logic.policy_statement_normalizer import PolicyStatementNormalizer
from oci_policy_analysis.logic.reference_data_repo import ReferenceDataRepo
# Global logger for this module
logger = get_logger(component='data_repo')
# Constants
THREADS = 8
# Cache Directory and Date (for consistency across classes)
CACHE_DIR = Path.home() / '.oci-policy-analysis' / 'cache'
# For MCP-specific JSON
VALID_VERBS = {'inspect', 'read', 'use', 'manage'}
[docs]
class PolicyAnalysisRepository:
"""
This is the main data repository for Policy, Identity, and Compartment data
During initialization, the entire compartment hierarchy and policy tree is loaded into a central JSON dictionary.
This central dictionary is then referenced by functions that filter and return a subset of information for display.
Parsing, additional analysis, and import/export are made available by additional functions exposed.
Loading of data starts from `load_policies_and_compartments`, which loads all compartments and policies recursively
Filtering functions return lists of dataclass objects defined in models.py for easy consumption by UI or CLI layers.
See `filter_policy_statements` for an example of filtering and returning PolicyStatement objects.
"""
def __init__(self):
self.compartments = [] # List of dicts: {id, name, parent_id, hierarchy_path, hierarchy_ocids}
self.policies: list[BasePolicy] = [] # List of BasePolicy dicts
self.regular_statements: list[RegularPolicyStatement] = []
self.cross_tenancy_statements = []
self.defined_aliases: list[DefineStatement] = [] # Store define statements as list of dict
self.dynamic_groups = []
self.identity_domains = []
self.groups = []
self.users: list[User] = []
self.domain_clients = {}
self.data_as_of = ''
self.tenancy_ocid = None
self.identity_client = None
self.identity_loaded_from_tenancy = False
self.policies_loaded_from_tenancy = False
self.version = 2
self.load_all_users = True
# Keep the refence data repo as a member
# self.permission_reference_repo = ReferenceDataRepo()
self.permission_reference_repo = None
# self.on_policy_statements_updated = None # Optional callback, set by UI for reload hooks
logger.info('Initialized PolicyAnalysisRepo')
# Create a Normalizer instance
self.normalizer = PolicyStatementNormalizer()
[docs]
def reset_state(self):
"""
Resets all main state variables (lists, dictionaries, flags, clients, IDs, etc.).
Call this before any data (re)load operation for a clean repository state.
"""
self.compartments = []
self.policies = []
self.regular_statements = []
self.cross_tenancy_statements = []
self.defined_aliases = []
self.dynamic_groups = []
self.identity_domains = []
self.groups = []
self.users = []
self.domain_clients = {}
self.data_as_of = ''
self.tenancy_ocid = None
self.identity_client = None
self.identity_loaded_from_tenancy = False
self.policies_loaded_from_tenancy = False
self.version = 1
self.load_all_users = True
self.permission_reference_repo = ReferenceDataRepo()
# If there are additional ephemeral analysis/cache attributes, reset them here
# (e.g., self._policy_progress_queue, self.normalizer, cached_*, etc.)
logger.info('PolicyAnalysisRepository state has been reset.')
[docs]
def initialize_client(
self,
use_instance_principal: bool,
session_token: str | None = None,
recursive: bool = True,
profile: str = 'DEFAULT',
) -> bool:
"""Initializes the OCI client to be used for all data operations
Client can be loaded using PROFILE or Instance Principal authentication methods
Args:
use_instance_principal: Whether to attempt Instance Principal signer-based authentication
recursive: Whether to load tenancy data across all compartments, or simply the root (tenancy) compartment
session: The named OCI Session Token Profile to use - must be present on the file system in the standard OCI location of .oci/config
profile: The named OCI Profile to use - must be present on the file system in the standard OCI location of .oci/config
Returns:
A boolean indicating whether the client was created successfully. False indicates that an unrecoverable issue occurred
setting up the client.
"""
self.session_token = session_token
self.use_instance_principal = use_instance_principal
try:
if use_instance_principal:
logger.debug('Using Instance Principal Authentication')
self.signer = InstancePrincipalsSecurityTokenSigner()
# Identity for all policy Data
# Identity for all policy Data
self.identity_client = IdentityClient(config={}, signer=self.signer)
self.logging_search_client = LogSearchClient(config={}, signer=self.signer)
# Resource Search Client
self.resource_search_client = ResourceSearchClient(config={}, signer=self.signer)
# Resource Search Client
self.resource_search_client = ResourceSearchClient(config={}, signer=self.signer)
self.tenancy_ocid = self.signer.tenancy_id
elif session_token:
logger.info('Attempt session auth')
self.config = config.from_file(profile_name=session_token)
token_file = self.config['security_token_file']
token = None
with open(token_file) as f:
token = f.read()
private_key = load_private_key_from_file(self.config['key_file'])
self.signer = SecurityTokenSigner(token, private_key)
self.identity_client = IdentityClient({'region': self.config['region']}, signer=self.signer)
# Resource Search Client
self.resource_search_client = ResourceSearchClient(
{'region': self.config['region']}, signer=self.signer
)
self.tenancy_ocid = self.config['tenancy']
logger.info('Success session auth')
else:
logger.debug(f'Using Profile Authentication: {profile}')
self.config = config.from_file(profile_name=profile)
self.identity_client = IdentityClient(self.config)
self.logging_search_client = LogSearchClient(self.config)
self.tenancy_ocid = self.config['tenancy']
# Resource Search Client
self.resource_search_client = ResourceSearchClient(self.config)
# Resource Search Client
self.resource_search_client = ResourceSearchClient(self.config)
logger.info(f'Set up Identity Client for tenancy: {self.tenancy_ocid}')
# Set Recursion
self.recursive = recursive
logger.debug(f'Set recursive to: {self.recursive}')
# Get tenancy name
self.tenancy_name = self.identity_client.get_compartment(compartment_id=self.tenancy_ocid).data.name
logger.info(f'Initialized client for tenancy: {self.tenancy_name} ({self.tenancy_ocid})')
return True
except (ConfigFileNotFound, Exception) as exc:
logger.fatal(f'Authentication failed: {exc}')
return False
[docs]
def check_statement_location_validity(self, st):
"""
Checks if the compartment location for a statement is valid (exists and is ACTIVE).
Args:
st: The policy statement (dict).
Returns:
None if valid; string message if invalid.
"""
if st.get('location_type') == 'compartment id':
logger.info(f'Checking location validity for statement: {st.get("statement_text")}')
location_ocid = st.get('location')
if not self._check_invalid_location(location_ocid):
return f'Compartment OCID {location_ocid} not found in tenancy'
return None
def _check_invalid_location(self, compartment_ocid) -> bool:
"""
Given a compartment OCID-based location, return False if there is no compartment (any more)
or if the compartment is not ACTIVE. True if it exists and is ACTIVE.
Called from Policy IntelligenceEngine.find_invalid_statements() - only doing this now because of the OCI Client needed
"""
try:
comp: Compartment = self.identity_client.get_compartment(compartment_id=compartment_ocid).data
if comp.lifecycle_state == Compartment.LIFECYCLE_STATE_ACTIVE:
return True
else:
logger.warning(f'Found Compartment but not ACTIVE: {compartment_ocid} was: {comp.lifecycle_state}')
return False
except Exception as e:
# Any error means it is invalid
logger.debug(f'Compartment OCID {compartment_ocid} not valid: {e}')
return False
def _parse_define_statement(self, policy: BasePolicy, statement: DefineStatement) -> bool:
"""
This is now a thin wrapper calling the centralized PolicyStatementNormalizer.
"""
try:
# Use definition's base model fields for required meta
base = {
k: statement[k]
for k in [
'policy_name',
# 'policy_description',
'policy_ocid',
'compartment_ocid',
'compartment_path',
'creation_time',
'internal_id',
]
if k in statement
}
normalized = self.normalizer.normalize(
statement_text=statement['statement_text'], statement_type='define', base_fields=base
)
if isinstance(normalized, dict) and not normalized.get('parsed', True):
# convert statement to dict to ensure we can add fields
statement_dict = dict(statement)
statement_dict['parsed'] = False
statement_dict['valid'] = False
statement_dict['invalid_reasons'] = normalized.get('invalid_reasons', [])
logger.debug(
f'Define statement was unable to normalize: {statement_dict.get("statement_text")} | Reason: {statement_dict.get("invalid_reasons")}'
)
self.defined_aliases.append(statement_dict)
return False
self.defined_aliases.append(normalized)
logger.debug(f'Define Statement Added: {normalized}')
return True
except Exception as e:
statement['parsed'] = False
statement['valid'] = False
statement['invalid_reasons'] = [f'Normalize define statement failed: {e}']
logger.debug(f'Normalize define statement failed: {e}')
self.defined_aliases.append(statement)
return False
def _parse_admit_statement(self, policy: BasePolicy, statement: AdmitStatement) -> bool:
"""
This is now a thin wrapper calling the centralized PolicyStatementNormalizer.
"""
try:
base = {
k: statement[k]
for k in [
'policy_name',
# 'policy_description',
'policy_ocid',
'compartment_ocid',
'compartment_path',
'creation_time',
'internal_id',
]
if k in statement
}
normalized = self.normalizer.normalize(
statement_text=statement['statement_text'], statement_type='admit', base_fields=base
)
if isinstance(normalized, dict) and not normalized.get('parsed', True):
statement_dict = dict(statement)
statement_dict['parsed'] = False
statement_dict['valid'] = False
statement_dict['invalid_reasons'] = normalized.get('invalid_reasons', [])
logger.debug(
f"Admit statement was unable to normalize: {statement_dict.get('statement_text')} | Reason: {statement_dict.get('invalid_reasons')}"
)
self.cross_tenancy_statements.append(statement_dict)
return False
self.cross_tenancy_statements.append(normalized)
logger.debug(f'Admit Statement Added: {normalized}')
return True
except Exception as ex:
statement['valid'] = False
statement['parsed'] = False
statement['invalid_reasons'] = [f'Normalize admit parser failed: {ex}']
logger.debug(f'Normalize admit parser failed: {ex}')
self.cross_tenancy_statements.append(statement)
return False
def _parse_endorse_statement(self, policy: BasePolicy, statement: EndorseStatement) -> bool:
"""
This is now a thin wrapper calling the centralized PolicyStatementNormalizer.
"""
try:
base = {
k: statement[k]
for k in [
'policy_name',
# 'policy_description',
'policy_ocid',
'compartment_ocid',
'compartment_path',
'creation_time',
'internal_id',
]
if k in statement
}
normalized = self.normalizer.normalize(
statement_text=statement['statement_text'], statement_type='endorse', base_fields=base
)
if isinstance(normalized, dict) and not normalized.get('parsed', True):
statement_dict = dict(statement)
statement_dict['parsed'] = False
statement_dict['valid'] = False
statement_dict['invalid_reasons'] = normalized.get('invalid_reasons', [])
logger.debug(
f"Endorse statement was unable to normalize: {statement_dict.get('statement_text')} | Reason: {statement_dict.get('invalid_reasons')}"
)
self.cross_tenancy_statements.append(statement_dict)
return False
self.cross_tenancy_statements.append(normalized)
logger.debug(f'Endorse Statement Added: {normalized}')
return True
except Exception as ex:
statement['valid'] = False
statement['parsed'] = False
statement['invalid_reasons'] = [f'Normalize endorse parser failed: {ex}']
logger.debug(f'Normalize endorse parser failed: {ex}')
self.cross_tenancy_statements.append(statement)
return False
def _resolve_ocid_subjects_in_statement(self, stmt: RegularPolicyStatement):
"""
If the statement has subject_type group or dynamic-group and all subjects are OCIDs,
replace each OCID with (domain, name) if resolvable, otherwise ('Unknown', ocid).
Mark as invalid if any unresolved OCIDs. Add parsing_notes for both resolution and unresolved cases.
This is done in-place on the statement dict.
"""
subject_type = stmt.get('subject_type')
subjects = stmt.get('subject', [])
if not (subject_type in ('group', 'dynamic-group') and isinstance(subjects, list)):
return
# Detect if all subjects are in OCID format (no tuple/list inside)
all_ocids = all(isinstance(s, str) and s.lower().startswith('ocid1.') for s in subjects)
if not all_ocids:
return
resolved_subjects = []
unresolved_ocids = []
for ocid in subjects:
if subject_type == 'group':
grp = next((g for g in self.groups if g.get('group_ocid', '').lower() == ocid.lower()), None)
if grp:
dom = grp.get('domain_name') or 'Default'
name = grp.get('group_name') or ocid
resolved_subjects.append((dom, name))
else:
resolved_subjects.append(('Unknown', ocid))
unresolved_ocids.append(ocid)
elif subject_type == 'dynamic-group':
dg = next(
(d for d in self.dynamic_groups if d.get('dynamic_group_ocid', '').lower() == ocid.lower()), None
)
if dg:
dom = dg.get('domain_name') or 'Default'
name = dg.get('dynamic_group_name') or ocid
resolved_subjects.append((dom, name))
else:
resolved_subjects.append(('Unknown', ocid))
unresolved_ocids.append(ocid)
stmt['subject'] = resolved_subjects
notes = stmt.setdefault('parsing_notes', [])
if len(unresolved_ocids) > 0:
notes.append(f"Failed to resolve OCID(s): {', '.join(unresolved_ocids)}; inserted as ('Unknown', ocid)")
stmt['valid'] = False
else:
notes.append('All OCID subject(s) resolved to domain/name tuple(s).')
def _parse_statement(self, policy: BasePolicy, statement: RegularPolicyStatement) -> bool:
"""
This is now a thin wrapper calling the centralized PolicyStatementNormalizer.
"""
try:
base = {
k: statement[k]
for k in [
'policy_name',
# 'policy_description',
'policy_ocid',
'compartment_ocid',
'compartment_path',
'creation_time',
'internal_id',
]
if k in statement
}
normalized = self.normalizer.normalize(
statement_text=statement['statement_text'], statement_type='regular', base_fields=base
)
if isinstance(normalized, dict) and not normalized.get('parsed', True):
statement_dict = dict(statement)
statement_dict['action'] = 'unknown'
statement_dict['parsed'] = False
statement_dict['valid'] = False
statement_dict['invalid_reasons'] = normalized.get('invalid_reasons', [])
logger.debug(
f"Regular statement was unable to normalize: {statement_dict.get('statement_text')} | Reason: {statement_dict.get('invalid_reasons')}"
)
logger.debug(f'Full invalid statement data: {statement_dict}')
self.regular_statements.append(statement_dict)
return False
# OCID subject resolution step
self._resolve_ocid_subjects_in_statement(normalized)
self.regular_statements.append(normalized)
logger.debug(f'Regular Policy Statement Parsed: {normalized}')
logger.debug(f'Regular Policy Statement Parsed: {normalized}')
return True
except Exception as ex:
statement['parsed'] = False
statement['valid'] = False
statement['invalid_reasons'] = [f'Normalize regular policy parser failed: {ex}']
logger.debug(f'Normalize regular policy parser failed: {ex}')
self.regular_statements.append(statement)
return False
def _parse_dynamic_group(self, domain, dg: DynamicResourceGroup) -> DynamicGroup:
"""Extract the contents of the DG into a dict"""
logger.debug(f'Created by: {dg.idcs_created_by}')
return DynamicGroup(
domain_name=domain.display_name,
domain_ocid=domain.id,
dynamic_group_name=dg.display_name,
dynamic_group_id=dg.id,
description=dg.description or '',
matching_rule=dg.matching_rule,
in_use=True, # Placeholder until analysis is run
dynamic_group_ocid=dg.ocid,
creation_time=str(dg.meta.created),
created_by_ocid=dg.idcs_created_by.ocid if dg.idcs_created_by else None,
created_by_name=dg.idcs_created_by.display if dg.idcs_created_by else None,
)
# --- Main Data Loading Functions for Tenancy ---
[docs]
def load_policies_and_compartments(self) -> bool: # noqa: C901
"""
Optimized bulk loading of all compartments and all policies using OCI Clients.
1. Fetch compartments (hierarchy, flat)
2. Fetch policies (threaded fetch/parse) from OCI Resource Search
3. No queue or milestone progress emission
"""
self.compartments = []
self.policies = []
self.regular_statements: list[RegularPolicyStatement] = []
self.cross_tenancy_statements: list[BasePolicyStatement] = []
self.defined_aliases: list[DefineStatement] = []
start_time = time.perf_counter()
try:
logger.info('Bulk fetching all compartments...')
root_comp_response = self.identity_client.get_compartment(compartment_id=self.tenancy_ocid)
if not root_comp_response or not root_comp_response.data:
logger.error(f'Failed to get root compartment: {self.tenancy_ocid}')
return False
root_comp = root_comp_response.data
comp_response = pagination.list_call_get_all_results(
self.identity_client.list_compartments,
self.tenancy_ocid,
access_level='ACCESSIBLE',
sort_order='ASC',
compartment_id_in_subtree=True,
lifecycle_state='ACTIVE',
limit=1000,
)
all_comps = [root_comp] + (list(comp_response.data) if comp_response and comp_response.data else [])
logger.info(f'Total compartments loaded: {len(all_comps)}')
self.compartments = []
for comp in all_comps:
self.compartments.append(
{
'id': comp.id,
'name': comp.name if comp.id != self.tenancy_ocid else 'ROOT',
'parent_id': comp.compartment_id,
'hierarchy_path': None,
'description': comp.description if hasattr(comp, 'description') else None,
}
)
logger.info('Building compartment hierarchy paths and lookup tables...')
for compartment in self.compartments:
compartment['hierarchy_path'] = self._get_hierarchy_path_for_compartment(compartment, '')
logger.info(
'Bulk fetching all policies for all compartments using Resource Search or tenancy-wide method...'
)
# This query should be different if we want to limit to root compartment only
if self.recursive:
policy_query = 'query policy resources'
else:
policy_query = f"query policy resources where compartmentId = '{self.tenancy_ocid}'"
policy_search_results = self.resource_search_client.search_resources(
search_details=StructuredSearchDetails(type='Structured', query=policy_query), limit=1000
)
if policy_search_results and policy_search_results.data and policy_search_results.data.items:
logger.info(
f'Found {len(policy_search_results.data.items)} policies via Resource Search (recursive={self.recursive}).'
)
total_policies = len(policy_search_results.data.items)
def _process_policy_resource(item, position, total_policies):
policy_ocid = item.identifier
compartment_ocid = item.compartment_id
try:
policy_response = self.identity_client.get_policy(policy_id=policy_ocid)
if policy_response and policy_response.data:
policy_obj = BasePolicy(
policy_ocid=policy_response.data.id,
policy_name=policy_response.data.name,
description=policy_response.data.description or '',
compartment_ocid=policy_response.data.compartment_id,
creation_time=policy_response.data.time_created,
)
self.policies.append(policy_obj)
for statement in policy_response.data.statements:
# DO NOT lowercase statement text - preserve original case
hierarchy_path = next(
(
comp['hierarchy_path']
for comp in self.compartments
if comp['id'] == compartment_ocid
),
'UNKNOWN_PATH',
)
base_policy_statement: BasePolicyStatement = BasePolicyStatement(
policy_name=policy_response.data.name,
policy_ocid=policy_response.data.id,
# policy_description=policy_response.data.description or '',
compartment_ocid=policy_response.data.compartment_id,
compartment_path=hierarchy_path,
statement_text=statement,
creation_time=str(policy_response.data.time_created),
internal_id=hashlib.md5((statement + policy_response.data.id).encode()).hexdigest(),
parsed=False,
)
st_text_lower = statement.strip().lower()
if st_text_lower.startswith('define'):
define_statement: DefineStatement = DefineStatement(**base_policy_statement)
self._parse_define_statement(policy_obj, define_statement)
elif (
st_text_lower.startswith('admit')
or st_text_lower.startswith('endorse')
or st_text_lower.startswith('deny admit')
or st_text_lower.startswith('deny endorse')
):
if st_text_lower.startswith('admit') or st_text_lower.startswith('deny admit'):
admit_statement: AdmitStatement = AdmitStatement(**base_policy_statement)
self._parse_admit_statement(policy_obj, admit_statement)
elif st_text_lower.startswith('endorse') or st_text_lower.startswith(
'deny endorse'
):
endorse_statement: EndorseStatement = EndorseStatement(**base_policy_statement)
self._parse_endorse_statement(policy_obj, endorse_statement)
else:
policy_statement: RegularPolicyStatement = RegularPolicyStatement(
**base_policy_statement
)
self._parse_statement(policy_obj, policy_statement) # include validation as before
except Exception as e:
logger.warning(f'Failed to get policy {policy_ocid}: {e}')
with ThreadPoolExecutor(max_workers=THREADS) as executor:
for idx, item in enumerate(policy_search_results.data.items):
executor.submit(_process_policy_resource, item, idx, total_policies)
self.data_as_of = str(datetime.now(UTC))
total_time = time.perf_counter() - start_time
total_time = time.perf_counter() - start_time
logger.info(
f'Bulk loaded {len(self.compartments)} compartments and {len(self.regular_statements)} policy statements in {total_time:.2f}s'
f'Bulk loaded {len(self.compartments)} compartments and {len(self.regular_statements)} policy statements in {total_time:.2f}s'
)
# Return True because we loaded successfully
self.policies_loaded_from_tenancy = True
return True
except Exception as e:
logger.error(f'Failed to load policies and compartments: {e}')
return False
[docs]
def load_complete_identity_domains(self, load_all_users: bool = True) -> bool: # noqa: C901
"""Loads everything into the cetntral JSON
Identity Domains are loaded via the Identity Client.
For each Identity Domain, load the Dynamic Groups, Groups, and Users
Args:
load_all_users (bool): If False, skip loading users. Default is True (backwards compatible).
Returns:
A boolean indicating success of the data load. False indicates there was some failure in loading data,
so it may be incomplete.
"""
try:
domain_response = self.identity_client.list_domains(compartment_id=self.tenancy_ocid) # type: ignore
if domain_response.data is None: # type: ignore
logger.error('Failed to list identity domains')
return False
# Should we really keep the full thing?
self.identity_domains = domain_response.data
logger.info(f'Loaded {len(self.identity_domains)} identity domains')
self.domain_clients = {}
for domain in self.identity_domains:
try:
# Get IdentityDomainsClient and hold on to it
if self.use_instance_principal:
domain_client = IdentityDomainsClient(
config={}, signer=self.signer, service_endpoint=domain.url
)
elif self.session_token:
logger.info('Session auth for IdentityDomainsClient')
self.config = config.from_file(profile_name=self.session_token)
token_file = self.config['security_token_file']
token = None
with open(token_file) as f:
token = f.read()
private_key = load_private_key_from_file(self.config['key_file'])
self.signer = SecurityTokenSigner(token, private_key)
domain_client = IdentityDomainsClient(
{'region': self.config['region']}, signer=self.signer, service_endpoint=domain.url
)
self.tenancy_ocid = self.config['tenancy']
logger.info('Success session auth')
else:
domain_client = IdentityDomainsClient(config=self.config, service_endpoint=domain.url)
self.domain_clients[domain.id] = domain_client
# Load Dynamic Groups
# Now we need to get each one and cause additional calls
dg_response = domain_client.list_dynamic_resource_groups(attribute_sets=['never'])
# dg_response = domain_client.list_dynamic_resource_groups(attributes='matching_rule')
if dg_response and dg_response.data:
logger.debug(
f'Got the List of DG for {domain.display_name}. Count: {len(dg_response.data.resources)}'
)
for _dg in dg_response.data.resources:
# Do a full on get to get all attributes
full_dg = domain_client.get_dynamic_resource_group(
dynamic_resource_group_id=_dg.id, attribute_sets=['all']
).data
dg = full_dg
logger.debug(f'DG: {dg.display_name} Matching Rule: {dg.matching_rule}')
# Append the Dynamic Group dict to the list
self.dynamic_groups.append(self._parse_dynamic_group(domain=domain, dg=dg))
else:
logger.error('Failed to list dynamic groups')
return False
# Load Groups
start_index = 1
limit = 1000
while True:
group_response = domain_client.list_groups(
start_index=start_index, count=limit, sort_by='displayName', sort_order='ASCENDING'
)
if group_response.data is None or not group_response.data.resources:
break
for g in group_response.data.resources:
logger.debug(f'Group: {g}')
# Set the group into the bigger picture JSON
self.groups.append(
Group(
domain_name=domain.display_name,
group_name=g.display_name,
group_ocid=g.ocid,
group_id=g.id,
description=g.urn_ietf_params_scim_schemas_oracle_idcs_extension_group_group.description
if g.urn_ietf_params_scim_schemas_oracle_idcs_extension_group_group
else '',
)
)
# Logic to re-start new request
if (
len(group_response.data.resources) < limit
or start_index + limit > group_response.data.total_results
):
break
start_index += limit
logger.debug(f'All Groups: {self.groups}')
# --- LOAD USERS if enabled ---
if load_all_users:
start_index = 1
while True:
user_response = domain_client.list_users(
start_index=start_index,
count=limit,
sort_by='displayName',
sort_order='ASCENDING',
attribute_sets=['never'],
)
if user_response.data is None or not user_response.data.resources:
break
for u in user_response.data.resources:
logger.debug(f'User: {u}')
user_attributes = domain_client.get_user(user_id=u.id, attribute_sets=['all']).data
# Print this for now
logger.debug(f'***User Attributes: {user_attributes}')
groups_list = []
# If there are groups, loop them
if user_attributes.groups:
logger.debug(f'User {u.display_name} Groups: {user_attributes.groups}')
for gg in user_attributes.groups:
groups_list.append(gg.ocid)
else:
logger.debug(f'No groups for user {u.display_name}')
# Default the email to None
email = 'None'
if hasattr(user_attributes, 'emails') and user_attributes.emails:
for em in user_attributes.emails:
if em.primary:
email = em.value
break
else:
logger.debug(f'No emails for user {u.display_name}')
# Set the user into the bigger picture JSON
self.users.append(
User(
domain_name=domain.display_name,
user_name=u.user_name,
user_ocid=u.ocid,
display_name=u.display_name,
email=email,
user_id=u.id,
groups=groups_list,
)
)
# Loop Logic
if (
len(user_response.data.resources) < limit
or start_index + limit > user_response.data.total_results
):
break
start_index += limit
logger.debug(f'All Users: {self.users}')
else:
self.users = []
self.data_as_of = str(datetime.now(UTC))
# Indicate we loaded successfully
except Exception as e:
logger.error(f'Failed to load groups/users for domain {domain.id}: {e}')
raise
logger.info(
f'Loaded {len(self.groups)} groups, {len(self.users)} users, {len(self.dynamic_groups)} dynamic groups across all domains'
)
# Set this so that callback can stop any waiting
self.identity_loaded_from_tenancy = True
return True
except Exception as e:
logger.error(f'Failed to load identity domains: {e}')
# return False
raise e
# --- Main Filtering Functions ---
# Filtering logic - return a list of policy statements matching given filter
# Single policy filter function that resolves fuzzy search if provided, exact search if provided, and then other criteria if provided
# If multiple criteria are provided, they are ANDed together
# If multiple values are provided for a single criteria, they are ORed together
# If no criteria are provided, return all policy statements
# If no policy statements exist, return empty list
# Fuzzy and Exact search are mutually exclusive - if both are provided, fuzzy search is used
# If Identity Domains are not loaded and either fuzzy or exact search is requested, raise an error
[docs]
def filter_policy_statements(self, filters: PolicySearch) -> list[RegularPolicyStatement]: # noqa: C901
"""
Filter policy statements by one or more criteria.
Args:
filters (PolicySearch): Dictionary of filter keys and their values (e.g. verb, resource, permission, group, etc).
Returns:
list[PolicyStatement]: List of statements matching the filter.
"""
logger.debug(f'Filtering policy statements with criteria: {filters}')
# If fuzzy or exact search is requested, identity domains must be loaded. If not, raise an error
# Previously, filtering by group/user/dynamic-group required identity_domains_loaded.
# This check and logic has been removed per requirements; filtering will proceed regardless.
# If fuzzy search is provided, use it and ignore exact search.
self._resolve_fuzzy_search(filters=filters)
# If exact users were provided for filtering, resolve them to domain/name tuples
self._resolve_exact_users(filters=filters)
# At this point we have exact groups or exact dynamic groups to deal with
logger.info(f'Post-fuzzy/exact search filters: {filters}')
# Apply regular search - AND all provided fields except fuzzy search
results = []
for stmt in self.regular_statements:
match = True
for key, values in filters.items():
if key == 'exact_groups':
# Get the groups from the exact filter
logger.debug(f'Filtering on exact_groups with values: {values}')
groups_filter = filters.get('exact_groups', None)
# Only applies to statements where "subject_type" == "group"
if stmt.get('subject_type') != 'group':
logger.debug(f"Rejecting {stmt.get('policy_name')} due to subject_type not 'group'")
match = False
break
subjects = stmt.get('subject', [])
if not isinstance(subjects, list):
logger.warning(f'Unexpected Subject format in statement {stmt.get("policy_name")}: {subjects}')
match = False
break
if len(groups_filter) == 0:
logger.debug('No groups in exact_groups filter, thus no match possible')
match = False
break
# A match occurs if any provided domain and group name combo matches any subject in the statement (case-insensitive)
subj_matched = False
for subj_domain, subj_name in subjects:
# Now we need to iterate the provided groups and see if any match
for group in groups_filter:
group_domain = group.get('domain_name') or 'default'
group_name = group.get('group_name')
if (
subj_domain.casefold() == group_domain.casefold()
and subj_name.casefold() == group_name.casefold()
):
logger.debug(
f'Matched group {subj_domain}/{subj_name} in statement {stmt.get("policy_name")} to filter group {group_domain}/{group_name}'
)
subj_matched = True
if not subj_matched:
logger.debug(
f'No match found for exact_group filter in statement {stmt.get("policy_name")} Text: {stmt.get("statement_text")} Statement: {stmt.get("subject")}'
)
match = False # If we get here, no match found
break
# For exact dynamic group, similar logic
elif key == 'exact_dynamic_groups' and values:
logger.debug(f'Filtering on exact_dynamic_groups with values: {values}')
dyn_groups_filter = filters.get('exact_dynamic_groups', [])
if stmt.get('subject_type') != 'dynamic-group':
logger.debug(f"Rejecting {stmt.get('policy_name')} due to Subject Type not 'dynamic-group'")
match = False
break
subjects = stmt.get('subject', [])
if not isinstance(subjects, list):
logger.warning(f'Unexpected Subject format in statement {stmt.get("policy_name")}: {subjects}')
match = False
break
subj_matched = False
for subj_domain, subj_name in subjects:
for dg in dyn_groups_filter:
dg_domain = dg.get('domain_name') or 'default'
dg_name = dg.get('dynamic_group_name')
if (
subj_domain.casefold() == dg_domain.casefold()
and subj_name.casefold() == dg_name.casefold()
):
logger.debug(
f'Matched dynamic group {subj_domain}/{subj_name} in statement {stmt.get("policy_name")} to filter group {dg_domain}/{dg_name}'
)
subj_matched = True
if not subj_matched:
logger.debug(
f'No match found for exact_dynamic_groups filter in statement {stmt.get("policy_name")} Text: {stmt.get("statement_text")} Statement: {stmt.get("subject")}'
)
match = False # If we get here, no match found
break
# Compartment special: ROOTONLY
elif key == 'policy_compartment' and 'ROOTONLY' in values:
if stmt.get('compartment_ocid') != self.tenancy_ocid:
logger.debug(f'Rejecting {stmt.get("policy_name")} due to ROOTONLY restriction')
match = False
break
elif key == 'location' and 'tenancy' in values:
if stmt.get('location_type', '').casefold() != 'tenancy':
logger.debug(f'Rejecting {stmt.get("policy_name")} due to location not tenancy')
match = False
break
# Once domain cases are done, iterate remaining values
# Verb enum
elif key == 'verb':
invalid = set(values) - VALID_VERBS
if invalid:
logger.debug(f'Invalid verbs in filter: {invalid}')
field_value = str(stmt.get('verb', '')).lower()
if field_value not in values:
logger.debug(f'Rejecting {stmt.get("policy_name")} due to verb mismatch: {field_value}')
match = False
break
# Validity check
elif key == 'valid':
valid_value = values
statement_valid_value = stmt.get('valid', False)
logger.debug(f'Filtering on validity: {valid_value} vs {statement_valid_value}')
if valid_value != statement_valid_value:
logger.debug(f'Rejecting {stmt.get("policy_name")} due to validity mismatch')
match = False
break
# Effective path search
elif key == 'effective_path':
filter_eff_value = values[0].lower()
statement_eff_value = str(stmt.get('effective_path', '')).lower()
logger.debug(f'Filtering on filt/st {filter_eff_value} vs {statement_eff_value}')
# Logic here - if the effective path given contains the effective path of the statement,
# then it is a match. This allows searching for all policies effective in a given compartment and its children.
if not (filter_eff_value.startswith(statement_eff_value)):
logger.debug(
f'Rejecting {stmt.get("policy_name")} due to effective_path mismatch: '
f'{statement_eff_value} not in {filter_eff_value}'
)
match = False
break
# Default lookup using column map
else:
column = key
logger.debug(f'Filtering on {key} mapped to column {column} with values {values}')
if not column or not values:
logger.debug(f'Unknown filter key: {key} or values empty, skipping')
continue
field_value = str(stmt.get(column, '')).lower()
if not any(val.lower() in field_value for val in values):
logger.debug(f'Rejecting {stmt.get("policy_name")} due to {key} mismatch')
match = False
break
if match:
results.append(stmt)
logger.info(f'Filter applied. {len(results)} matched out of {len(self.regular_statements)} Regular statements.')
return results
[docs]
def filter_cross_tenancy_policy_statements(self, alias_filter: list[str]) -> list[RegularPolicyStatement]:
"""
Filter cross-tenancy policy statements containing any provided alias.
Args:
alias_filter (list[str]): List of aliases to look for in statement text.
Returns:
list[PolicyStatement]: Filtered cross-tenancy policy statements.
"""
filtered = []
for statement in self.cross_tenancy_statements:
for alias_to_check in alias_filter:
# Check each alias to see if in statement text
statement_text = statement.get('statement_text', '')
if alias_to_check in statement_text:
logger.debug(f'Adding statement (alias={alias_to_check}): {statement_text}')
filtered.append(statement)
logger.info(f'Returning {len(filtered)} Cross-Tenancy Results')
return filtered
# -- Identity Domain Related Filtering Functions ---
[docs]
def get_users_for_group(self, group: Group) -> list[User]:
"""
Return all users that belong to the specified exact group. Membership is determined by matching the group name and domain name.
Args:
group (Group): A dictionary with keys:
- 'domain': str | None
- 'name': str
Returns:
list[User]: A list of Users that belong to the specified group. If the group does not exist or has no members, returns an empty list.
"""
group_domain = group.get('domain_name') or 'default'
group_name = group['group_name']
logger.debug(f'Number of groups: {len(self.groups)} Number of users: {len(self.users)}')
# Get GID (as it is used by users)
group_ocid = None
for g in self.groups:
if (
g.get('group_name', '').casefold() == group_name.casefold()
and g.get('domain_name', '').casefold() == group_domain.casefold()
):
group_ocid = g.get('group_ocid')
break
if not group_ocid:
logger.warning(f'Group not found: {group_domain}/{group_name}')
return []
logger.debug(f'Group OCID: {group_ocid}')
# now iterate users and see if any have that OCID in their groups field
matched_users = [u for u in self.users if group_ocid in u.get('groups', [])]
logger.info(f'Found {len(matched_users)} users for group {group_domain}/{group_name}')
return matched_users
[docs]
def get_groups_for_user(self, user: User) -> list[Group]:
"""Return the list of all Groups that a user is a member of
Args:
user (User): The user to find groups for.
Returns:
list[Group]: A list of Groups that the user is a member of.
"""
groups_for_user: list[Group] = []
logger.info(f'User to filter: {user}')
logger.debug(f'Users: {self.users}')
# Iterate through users to find our user
for u in self.users:
# Match the tuple
if (
u.get('user_name', '').casefold() == user.get('user_name').casefold()
and u.get('domain_name', 'default').casefold() == user.get('domain_name', 'default').casefold()
):
logger.debug(f'User found. Groups: {u.get("groups")}')
# hold that thought...
for user_group_ocid in u.get('groups', []):
# Find the Group OCID in the groups and append
for g in self.groups:
if g.get('group_ocid') == user_group_ocid:
# Now append as tuple
groups_for_user.append(g)
logger.debug(f'Adding Group {g.get("domain_name")} / {g.get("group_name")} ')
logger.info(f'Found {len(groups_for_user)} groups for user {user.get("domain_name")} / {user.get("user_name")}')
return groups_for_user
def _user_search_internal(self, user_filter: UserSearch) -> list[User]:
"""
Search for users based on the provided filter.
Using the internal names in the User object
"""
logger.info(f'User filter to check: {user_filter}')
users_return: list[User] = []
for u in self.users:
# for uu in user_filter:
matches_domain = not user_filter.get('domain_name') or any(
term.lower() in str(u.get('domain_name')).lower() for term in user_filter.get('domain_name')
)
matches_username = not user_filter.get('search') or any(
term.lower() in str(u.get('username')).lower() for term in user_filter.get('search')
)
matches_display = not user_filter.get('search') or any(
term.lower() in str(u.get('display_name')).lower() for term in user_filter.get('search')
)
matches_ocid = not user_filter.get('user_ocid') or any(
term.lower() in str(u.get('user_ocid')).lower() for term in user_filter.get('user_ocid')
)
# If any match (OR), then get groups and add to exact match
if matches_domain and (matches_username or matches_display) and matches_ocid:
# get groups for user
logger.debug(f'Found a user match: {u} / {user_filter}')
users_return.append(u)
logger.info(f'User Search got {len(users_return)} users')
return users_return
def _group_search_internal(self, group_filter: GroupSearch) -> list[Group]:
"""
Search for groups based on the provided filter.
Using the internal names in the User object
"""
logger.info(f'Group filter to check: {group_filter}')
groups_return: list[Group] = []
for g in self.groups:
matches_name = not group_filter.get('group_name') or any(
term in str(g.get('group_name')).lower() for term in group_filter.get('group_name')
)
matches_domain = not group_filter.get('domain_name') or any(
term in str(g.get('domain_name')).lower() for term in group_filter.get('domain_name', ['default'])
)
matches_ocid = not group_filter.get('group_ocid') or any(
term in str(g.get('group_ocid')).lower() for term in group_filter.get('group_ocid')
)
if matches_name and matches_domain and matches_ocid:
groups_return.append(g)
logger.info(f'Group Search returning {len(groups_return)} groups')
return groups_return
def _dynamic_group_search_internal(self, dg_filter: DynamicGroupSearch) -> list[DynamicGroup]:
"""Search for dynamic groups based on the provided filter."""
logger.info(f'Dynamic Group filter to check: {dg_filter}')
dgs_return: list[DynamicGroup] = []
for dg in self.dynamic_groups:
matches_name = not dg_filter.get('dynamic_group_name') or any(
term in str(dg.get('dynamic_group_name')).lower() for term in dg_filter.get('dynamic_group_name')
)
matches_domain = not dg_filter.get('domain_name') or any(
term in str(dg.get('domain_name')).lower() for term in dg_filter.get('domain_name', ['default'])
)
matches_ocid = not dg_filter.get('dynamic_group_ocid') or any(
term in str(dg.get('dynamic_group_ocid')).lower() for term in dg_filter.get('dynamic_group_ocid')
)
matches_rule = not dg_filter.get('matching_rule') or any(
term in str(dg.get('matching_rule')).lower() for term in dg_filter.get('matching_rule')
)
matches_description = not dg_filter.get('description') or any(
term in str(dg.get('description')).lower() for term in dg_filter.get('description')
)
if matches_name and matches_domain and matches_ocid and matches_rule and matches_description:
dgs_return.append(
{
'domain_name': dg.get('domain_name'),
'dynamic_group_name': dg.get('dynamic_group_name'),
'dynamic_group_ocid': dg.get('dynamic_group_ocid'),
}
)
logger.info(f'Dynamic Group Search returning {len(dgs_return)} dynamic groups')
return dgs_return
def _resolve_fuzzy_search(self, filters: PolicySearch): # noqa: C901
"""Look for fuzzy search and turn it into an exact search"""
logger.debug(f'Resolve fuzzy Groups: {filters.get("search_groups")}')
logger.debug(f'Resolve fuzzy Users: {filters.get("search_users")}')
logger.debug(f'Resolve fuzzy DG: {filters.get("search_dynamic_groups")}')
# First do fuzzy user search
if filters.get('search_users'):
user_filter: UserSearch = filters.get('search_users')
logger.info(f'User filter to check: {user_filter}')
filtered_users = self._user_search_internal(user_filter)
logger.info(f'User search returned {len(filtered_users)} users')
# Now, for each user, get their groups and add to exact groups
exact_groups: list[Group] = []
for u in filtered_users:
user_groups: list[Group] = self.get_groups_for_user(u)
exact_groups.extend(user_groups)
# De-dup exact groups
seen = set()
deduplicated_list = []
for group in exact_groups:
identifier = (group.get('domain_name') or 'Default', group.get('group_name'))
if identifier not in seen:
seen.add(identifier)
deduplicated_list.append(group)
exact_groups = deduplicated_list
# Set exact groups into filter that was passed in
filters['exact_groups'] = exact_groups
del filters['search_users']
logger.info(f'Added {len(exact_groups)} exact groups to filter (removed fuzzy user search)')
# Next, fuzzy group search
elif filters.get('search_group'):
group_filter: GroupSearch = filters.get('search_groups')
exact_groups: list[Group] = self._group_search_internal(group_filter)
# De-dup exact groups
seen = set()
deduplicated_list = []
for group in exact_groups:
identifier = (group.get('domain_name') or 'Default', group.get('group_name'))
if identifier not in seen:
seen.add(identifier)
deduplicated_list.append(group)
exact_groups = deduplicated_list
# Set exact groups into filter that was passed in
filters['exact_groups'] = exact_groups
# remove the fuzzy search
del filters['search_groups']
logger.info(f'Added {len(exact_groups)} exact groups to filter')
# Finally, fuzzy dynamic group search
elif filters.get('search_dynamic_groups'):
dg_filter: DynamicGroupSearch = filters.get('search_dynamic_groups')
exact_dgs: list[DynamicGroup] = self._dynamic_group_search_internal(dg_filter)
# Set exact DGs into filter that was passed in
filters['exact_dynamic_groups'] = exact_dgs
# Remove fuzzy search
del filters['search_dynamic_groups']
logger.info(f'Added {len(exact_dgs)} exact dynamic groups to filter (removed fuzzy dynamic group search)')
else:
logger.debug('No fuzzy logic executed, search not changed.')
def _resolve_exact_users(self, filters: PolicySearch):
"""Look for exact users and turn them into groups"""
if not filters.get('exact_users'):
return
user_filter: list[User] = filters.get('exact_users')
logger.info(f'Exact User filter to check: {user_filter}')
# Start with no groups and iterate users
exact_groups: list[Group] = []
for u in self.users:
# We need an exact match on domain and username
user_domain = u.get('domain_name') or 'default'
user_name = u.get('user_name')
for filter_user in user_filter:
filter_domain = filter_user.get('domain_name') or 'default'
filter_name = filter_user.get('user_name')
logger.debug(
f'Checking actual user {user_domain}/{user_name} against filter user {filter_domain}/{filter_name}'
)
if (
filter_domain.casefold() == user_domain.casefold()
and filter_name.casefold() == user_name.casefold()
):
# get groups for user
logger.debug(f'Exact user match found: {user_domain}/{user_name}')
uu: User = {'domain_name': user_domain, 'user_name': user_name} # type: ignore
user_groups: list[Group] = self.get_groups_for_user(uu)
logger.debug(f'User groups: {user_groups}')
# add groups into exact match in filter
exact_groups.extend(user_groups)
# De-dup exact groups
seen = set()
deduplicated_list = []
for group in exact_groups:
identifier = (group.get('domain_name') or 'Default', group.get('group_name'))
if identifier not in seen:
seen.add(identifier)
deduplicated_list.append(group)
exact_groups = deduplicated_list
# Set exact groups into filter that was passed in
filters['exact_groups'] = exact_groups
del filters['exact_users']
logger.info(f'Exact User Search {len(exact_groups)} exact groups to filter (removed exact user search)')
[docs]
def filter_groups(self, group_filter: GroupSearch) -> list[Group]:
"""Filter groups based on the provided filter. Public function used by MCP or UI"""
filtered = []
logger.info(f'Filtering Groups based on: {group_filter}')
filtered: list[Group] = self._group_search_internal(group_filter)
logger.info(f'Filtered to {len(filtered)} groups')
return filtered
[docs]
def filter_users(self, user_filter: UserSearch) -> list[User]:
"""
Filter users based on the provided filter.
This function is used by the MCP interface and the UI.
Args:
user_filter (UserSearch):
A dictionary with optional keys.
* ``domain_name`` (list[str]): Domain names to filter by (case-insensitive).
* ``search`` (list[str]): Search terms to match against usernames and display names (case-insensitive).
* ``user_ocid`` (list[str]): User OCIDs to filter by (case-insensitive).
Returns:
list[User]:
Users that match the filter criteria.
Each :class:`User` is represented as a dictionary with keys:
* ``domain_name`` (str | None): Domain name of the user.
* ``user_name`` (str): Username.
* ``user_ocid`` (str): OCID of the user.
* ``display_name`` (str): Display name of the user.
* ``email`` (str): Email of the user.
* ``user_id`` (str): Internal ID of the user.
* ``groups`` (list[str]): Group OCIDs the user belongs to.
"""
logger.info(f'Filtering Users (public) based on: {user_filter}')
filtered_users: list[User] = self._user_search_internal(user_filter)
logger.info(f'Filtered to {len(filtered_users)} users')
for u in filtered_users:
logger.debug(f'User: {u.get("domain_name")}/{u.get("user_name")} Name:"{u.get("display_name")}"')
return filtered_users
[docs]
def filter_dynamic_groups(self, filters: DynamicGroupSearch) -> list[DynamicGroup]:
"""
Filter dynamic groups using JSON-based filters.
Args:
filters (DynamicGroupSearch): A mapping of filter keys to one or more values.
- **OR**: multiple values within a field act as logical OR.
- **AND**: multiple fields are combined as logical AND.
**Supported keys:**
* ``domain_name`` → matches "Domain"
* ``dynamic_group_name`` → matches "DG Name"
* ``matching_rule`` → matches "Matching Rule"
* ``dynamic_group_ocid`` → matches "DG OCID"
* ``in_use`` → matches "In Use" (True/False)
Returns:
list[DynamicGroup]: A list of dynamic groups that satisfy the filters.
Each dynamic group is represented as a dictionary with keys:
* ``domain_name`` (str | None): The domain name of the dynamic group.
* ``dynamic_group_name`` (str): The name of the dynamic group.
* ``dynamic_group_id`` (str): The ID of the dynamic group.
* ``dynamic_group_ocid`` (str): The OCID of the dynamic group.
* ``matching_rule`` (str): The matching rule of the dynamic group.
* ``description`` (str): The description of the dynamic group.
* ``in_use`` (bool): Whether the dynamic group is in use.
* ``creation_time`` (str): The creation timestamp of the dynamic group.
* ``created_by_name`` (str): The name of the user who created the dynamic group.
* ``created_by_ocid`` (str): The OCID of the user who created the dynamic group.
Raises:
ValueError: If an unknown filter key is provided.
"""
results = []
logger.info(f'Filtering Dynamic Groups based on: {filters}')
for dg in self.dynamic_groups:
match = True
for key, values in filters.items():
# Check in-use first because it is special
if key == 'in_use':
if not values and not dg.get('in_use', False):
logger.debug(
f'DG included {dg.get("dynamic_group_name")} due to in_use match: {dg.get("in_use")} = {values}'
)
continue
else:
logger.debug(
f'DG rejected {dg.get("dynamic_group_name")} in_use: {dg.get("in_use")} != {values}'
)
match = False
break
elif not values:
logger.debug(f'Skipping empty filter for key: {key}')
continue
else:
values = [v.lower() for v in values]
logger.debug(f'Filtering on {key} mapped to column {key} with values {values}')
field_value = str(dg.get(key, '')).lower()
logger.debug(f'Field value for {key}: {field_value}')
if not any(val.lower() in field_value for val in values):
logger.debug(f'Rejecting DG {dg.get("DG Name")} due to {key} mismatch')
match = False
break
if match:
results.append(dg)
logger.info(f'Filter applied. {len(results)} matched out of {len(self.dynamic_groups)} Dynamic Groups.')
return results
# --- Other Public Functions ---
# Not in use
def _check_history(self, policy_ocid: str, start_time: str) -> None:
"""Look at audit logs to track changes to a policy"""
the_log = f'{self.tenancy_ocid}/_Audit'
logs_returned = self.logging_search_client.search_logs(
search_logs_details=SearchLogsDetails(
search_query=f"search \"{the_log}\" | (type in ('com.oraclecloud.identityControlPlane.UpdatePolicy','com.oraclecloud.identityControlPlane.CreatePolicy','com.oraclecloud.identityControlPlane.DeletePolicy')) | sort by datetime desc",
# search_query=f'search \"{the_log}\" where type=\'com.oraclecloud.identityControlPlane.UpdatePolicy\'',
time_start='2025-07-10T11:59:00Z',
time_end='2025-07-23T23:59:00Z',
),
limit=1000,
)
if logs_returned and logs_returned.data and logs_returned.data.results:
logger.info(f'Found {len(logs_returned.data.results)} logs for policy updates in the last 24 hours')
for log in logs_returned.data.results:
res: SearchResult = log
if res and res.data:
type_of_log = res.data.get('logContent').get('type')
change_curr = (
res.data.get('logContent').get('data').get('stateChange').get('current').get('statements')
)
change_prev = None
if (
res.data.get('logContent').get('data')
and res.data.get('logContent').get('data').get('stateChange')
and res.data.get('logContent').get('data').get('stateChange').get('previous')
):
# Previous state change exists
change_prev = (
res.data.get('logContent').get('data').get('stateChange').get('previous').get('statements')
)
logger.info(f'Log Type: {type_of_log}')
logger.info(f'***Log Details: Type: {type_of_log}Previous:{change_prev} Current:{change_curr}')
else:
logger.info('No policy update logs found in the last 24 hours')
pass
def _get_domains(self) -> list:
return [{'id': d.id, 'display_name': d.display_name, 'url': d.url} for d in self.identity_domains]
# --- Compliance Output Loading ---
# Because we are not using OCI clients here, we need to load from CSV files
# We need to load in this order:
# 1. Domains
# 2. Dynamic Groups
# 3. Users
# 3a. Augment users with group membership
# 4. Groups + Membership
# 5. Compartments
# 5a. Augment compartment data with path strings (cannot use client here)
# 6. Policies
def _get_domain_name_from_ocid(self, domain_ocid: str) -> str:
"""Given a domain OCID, return the domain name from loaded domains"""
if not domain_ocid or domain_ocid == '':
return 'Default'
for domain in self.identity_domains:
if domain.get('id') == domain_ocid:
return domain.get('display_name', 'Default')
return 'Default'
def _get_hierarchy_path_for_compartment(self, compartment, comp_string: str) -> str:
"""Given a compartment JSON dict, return the full hierarchy path as a string"""
# If OCID is the tenancy OCID, return ROOT
if compartment.get('id') == self.tenancy_ocid:
return 'ROOT'
path_parts = []
current_comp = compartment
while current_comp:
path_parts.append(current_comp.get('name', 'Unknown'))
parent_id = current_comp.get('parent_id')
if not parent_id or parent_id == current_comp.get('id'):
break
# Find parent compartment in loaded compartments
parent_comp = next((comp for comp in self.compartments if comp.get('id') == parent_id), None)
current_comp = parent_comp
# Reverse the path parts to get from root to leaf
path_parts.reverse()
full_path = '/'.join(path_parts)
logger.debug(f'Compartment {comp_string} full path: {full_path}')
return full_path
[docs]
def load_from_compliance_output_dir(self, dir_path: str, load_all_users: bool = True) -> bool: # noqa: C901
"""
Load all compartments, domains, groups, users, dynamic groups, and policies from compliance tool output files.
Starts with domains, then dynamic groups, then users/groups/membership, then compartments, then policies.
This function is for offline/compliance output analysis: no attempt to initialize any OCI client.
Args:
dir_path (str): Path to a directory containing the expected compliance output files.
load_all_users (bool): If False, skip loading users. Default is True.
Returns:
bool: True if all files parsed and data loaded successfully, False otherwise.
"""
logger.info(f'Loading compliance data from output dir: {dir_path}')
# We need to only use the CSV files and stop using the JSON file altogether
try:
# Step 1: Set the tenancy OCID and Name from the data
with open(os.path.join(dir_path, 'raw_data_identity_compartments.csv'), encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
if row.get('id', '').startswith('ocid1.tenancy.'):
self.tenancy_ocid = row.get('id', '')
self.tenancy_name = row.get('name', '')
logger.info(f'Set tenancy OCID to {self.tenancy_ocid} and name to {self.tenancy_name}')
break
if not self.tenancy_ocid or not self.tenancy_name:
logger.error('Could not find tenancy OCID and name in compartments CSV')
return False
# --- Step 2: Load Dynamic Groups ---
dgs_file = os.path.join(dir_path, 'raw_data_identity_dynamic_groups.csv')
with open(dgs_file, encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
created_by = row.get('idcs_created_by', '{}')
try:
created_by_json = json.loads(created_by)
created_by_ocid = created_by_json.get('odid', 'n/a')
except json.JSONDecodeError:
created_by_ocid = 'n/a'
domain_ocid = row.get('domain_ocid', '')
domain_name = self._get_domain_name_from_ocid(domain_ocid)
dg: DynamicGroup = {
'domain_name': domain_name or 'Default',
'dynamic_group_name': row.get('display_name') or '',
'dynamic_group_id': 'n/a',
'dynamic_group_ocid': row.get('ocid', ''),
'matching_rule': row.get('matching_rule', ''),
'description': row.get('description') or '',
'in_use': True, # Default to True; will be updated later
'creation_time': 'n/a',
'created_by_name': 'n/a',
'created_by_ocid': created_by_ocid,
}
self.dynamic_groups.append(dg)
logger.info(f'Loaded {len(self.dynamic_groups)} dynamic groups from CSV')
# --- Step 3: Load Groups ---
groups_file = os.path.join(dir_path, 'raw_data_identity_groups_and_membership.csv')
user_membership: dict[str, list[str]] = {}
user_domains: dict[str, str] = {}
seen_groups = set()
with open(groups_file, encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
group: Group = {
'domain_name': row.get('domain_deeplink', '').split('","')[-1].rstrip('")')
if 'domain_deeplink' in row
else 'Default',
'group_name': row.get('name') or '',
'group_ocid': row.get('id') or '',
'description': row.get('description') or '',
'group_id': row.get('id') or '',
}
logger.debug(f'Processing group: {group}')
member_user_ocid = row.get('user_id', '')
if member_user_ocid and member_user_ocid != '':
if member_user_ocid not in user_membership:
user_membership[member_user_ocid] = []
user_membership[member_user_ocid].append(row.get('id'))
group_key = (group['domain_name'], group['group_name'])
if member_user_ocid and member_user_ocid != '':
user_domains[member_user_ocid] = group['domain_name']
if group_key in seen_groups:
continue
seen_groups.add(group_key)
self.groups.append(group)
logger.debug(f'Loaded {len(self.groups)} groups')
# --- Step 4: Load Users, unless disabled ---
self.users = []
if load_all_users:
users_file = os.path.join(dir_path, 'raw_data_identity_users.csv')
with open(users_file, encoding='utf-8') as f:
reader = csv.DictReader(f)
for user_item in reader:
logger.debug(f'Processing user item: {user_item}')
user: User = {
'domain_name': user_item.get('domain_deeplink', '').split('","')[-1].rstrip('")')
if 'domain_deeplink' in user_item
else 'Default',
'user_name': user_item.get('name') or '', # No way to get username or email
'user_ocid': user_item.get('id') or '',
'display_name': user_item.get('name') or '',
'email': user_item.get('email') or '',
'user_id': user_item.get('external_identifier') or '',
'groups': [],
}
group_names_str = user_item.get('groups', '') or ''
group_names = eval(group_names_str) if group_names_str else []
group_ocids = []
for group_name in group_names:
group_obj = next(
(
g
for g in self.groups
if g.get('group_name') == group_name
and g.get('domain_name') == user.get('domain_name')
),
None,
)
if group_obj:
group_ocids.append(group_obj.get('group_ocid', ''))
user['groups'] = group_ocids
logger.info(f'Loaded user: {user}')
self.users.append(user)
logger.info(f'Loaded {len(self.users)} users')
else:
logger.info('Skipping load of users due to load_all_users=False')
# -- Step 5: Load Compartments ---
compartments_file = os.path.join(dir_path, 'raw_data_identity_compartments.csv')
with open(compartments_file, encoding='utf-8') as f:
reader = csv.DictReader(f)
# Iterate compartments and add to list
for comp_item in reader:
compartment = {
'id': comp_item.get('id') or '',
'name': comp_item.get('name') or '',
'hierarchy_path': None, # will be built later
'lifecycle_state': comp_item.get('lifecycle_state') or '',
'parent_id': comp_item.get('compartment_id') or '',
'description': comp_item.get('description') or '',
}
logger.debug(f'Processing compartment: {compartment}')
# Only add ACTIVE compartments
if compartment['lifecycle_state'] == 'ACTIVE':
self.compartments.append(compartment)
else:
logger.debug(
f"Skipping compartment {compartment['name']} with lifecycle state {compartment['lifecycle_state']}"
)
# For some reason the root compartment is not included - add it manually
root_compartment = {
'id': self.tenancy_ocid,
'name': 'ROOT',
'hierarchy_path': None,
'lifecycle_state': 'ACTIVE',
'parent_id': '',
}
self.compartments.append(root_compartment)
logger.debug(f'Loaded {len(self.compartments)} compartments')
# Now build hierarchy paths for each compartment
for comp in self.compartments:
logger.info(f"Building path for compartment {comp.get('name','n/a')}")
comp['hierarchy_path'] = self._get_hierarchy_path_for_compartment(comp, '')
logger.info('Built hierarchy paths for compartments')
# Debug just the compartment name and path for all compartments
for comp in self.compartments:
logger.info(f"Compartment: {comp.get('name','n/a')} Path: {comp.get('hierarchy_path','n/a')}")
# --- Step 6: Load Policies ---
compartments_file = os.path.join(dir_path, 'raw_data_identity_policies.csv')
with open(compartments_file, encoding='utf-8') as f:
reader = csv.DictReader(f)
for policy_item in reader:
# Create a Policy object for the Policy itself
policy_obj = BasePolicy(
policy_name=policy_item.get('name') or '',
policy_ocid=policy_item.get('id') or '',
compartment_ocid=policy_item.get('compartment_id') or '',
description=policy_item.get('description') or '',
creation_time='',
)
logger.debug(f'Processing policy: {policy_obj}')
# Not really appending policies itself right now, use for parsing statements though
self.policies.append(policy_obj)
# Look up the compartment path in loaded compartments
comp_path = next(
(
comp['hierarchy_path']
for comp in self.compartments
if comp['id'] == policy_item.get('compartment_id')
),
'ROOT',
)
# Get the basic details here and then iterate statements - those are to be added to the list
policy_ocid = policy_item.get('identifier') or ''
comp_id = policy_item.get('compartment_id') or ''
policy_name = policy_item.get('name') or ''
creation_time = policy_item.get('time_created') or ''
# Statements needs to be a list of strings, but appears like a single string in CSV. example:
# ['allow group iam_tag_group to inspect all-resources in tenancy', 'allow group iam_tag_group read instances in tenancy', 'allow group iam_tag_group to read load-balancers in tenancy', 'allow group iam_tag_group to read buckets in tenancy', 'allow group iam_tag_group to read nat-gateways in tenancy', 'allow group iam_tag_group to read public-ips in tenancy', 'allow group iam_tag_group to read file-family in tenancy', 'allow group iam_tag_group to read instance-configurations in tenancy', 'allow group iam_tag_group to read network-security-groups in tenancy', 'allow group iam_tag_group to read capture-filters in tenancy', 'allow group iam_tag_group to read resource-availability in tenancy', 'allow group iam_tag_group to read audit-events in tenancy', 'allow group iam_tag_group to read users in tenancy', 'allow group iam_tag_group to use cloud-shell in tenancy', 'allow group iam_tag_group to read vss-family in tenancy', 'allow group iam_tag_group to read usage-budgets in tenancy', 'allow group iam_tag_group to read usage-reports in tenancy', 'allow group iam_tag_group to read data-safe-family in tenancy', 'allow group iam_tag_group to read vaults in tenancy', 'allow group iam_tag_group to read keys in tenancy', 'allow group iam_tag_group to read tag-namespaces in tenancy', 'allow group Aiam_tag_group to use ons-family in tenancy where any {request.operation!=/Create*/, request.operation!=/Update*/, request.operation!=/Delete*/, request.operation!=/Change*/}']
statements = eval(policy_item.get('statements') or '[]')
logger.debug(f'Policy {policy_name} has {len(statements)} statements')
# Iterate each statement, determine type, and proceed to parse
for statement_text in statements:
# DO NOT lowercase statement text - preserve original case
stripped_statement = statement_text.strip()
base_policy_statement: BasePolicyStatement = BasePolicyStatement(
policy_name=policy_name,
policy_ocid=policy_ocid,
# policy_description=policy_item.get('description') or '',
compartment_ocid=comp_id,
compartment_path=comp_path,
statement_text=stripped_statement,
creation_time=creation_time,
internal_id=hashlib.md5((stripped_statement + '' + policy_ocid).encode()).hexdigest(),
parsed=False,
)
logger.debug(f'Processing statement: {statement_text}')
st_text_lower = stripped_statement.lower()
# Parse the statement now - cannot use the existing parser as is because it relies on OCI clients
if st_text_lower.startswith('define'):
# Parse as DefineStatement
define_statement: DefineStatement = DefineStatement(**base_policy_statement)
if not self._parse_define_statement(policy_obj, define_statement):
logger.debug(f'Define statement was unable to parse: {statement_text}')
logger.debug(f'Parsed define statement: {define_statement}')
# Admit and Deny Admit
elif st_text_lower.startswith('admit') or st_text_lower.startswith('deny admit'):
admit_statement: AdmitStatement = AdmitStatement(**base_policy_statement)
if not self._parse_admit_statement(policy_obj, admit_statement):
logger.debug(f'Admit statement was unable to parse: {statement_text}')
logger.debug(f'Parsed admit statement: {admit_statement}')
# Endorse Statement
elif st_text_lower.startswith('endorse'):
endorse_statement: EndorseStatement = EndorseStatement(**base_policy_statement)
if not self._parse_endorse_statement(policy_obj, endorse_statement):
logger.debug(f'Endorse statement was unable to parse: {statement_text}')
logger.debug(f'Parsed endorse statement: {endorse_statement}')
else:
# Regular Policy Statement
regular_statement: RegularPolicyStatement = RegularPolicyStatement(**base_policy_statement)
parsed_statement_valid = self._parse_statement(policy_obj, regular_statement)
if not parsed_statement_valid:
logger.warning(f'Invalid policy statement detected: {statement_text}')
logger.debug(f'Parsed regular policy statement: {regular_statement}')
logger.info(f'Loaded {len(self.regular_statements)} policy statements')
self.data_as_of = datetime.now(UTC).isoformat()
self.loaded_from_compliance_output = True
# logger.warning(f"on_policy_statements_updated callback failed: {e}")
logger.info('Compliance output data loaded successfully.')
return True
except Exception as e:
# Show stack trace for debugging
import traceback
traceback.print_exc()
logger.error(f'Compliance output data load failed: {e}')
return False