Source code for oci_policy_analysis.logic.reference_data_repo

##########################################################################
# Copyright (c) 2024, Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#
# DISCLAIMER This is not an official Oracle application, It does not supported by Oracle Support.
#
# reference_data_repo.py
#
# @author: Andrew Gregory
#
# Supports Python 3.12 and above
#
# coding: utf-8
##########################################################################

import glob
import json
import os

from oci_policy_analysis.common.logger import get_logger

logger = get_logger(component='reference_data_repo')


[docs] class ReferenceDataRepo: """ Repository for reference data on resources, families, and permissions. Loads from JSON files in a specified directory. JSON Structure Example: { "resources": { "resource_name": { "verbs": { "inspect": ["permission1", "permission2"], "read": ["permission3"], "use": ["permission4"], "manage": ["permission5"] } }, ... }, "families": { "family_name": { "resources": ["resource_name1", "resource_name2"], "source_url": "http://example.com/source" }, ... } } Once all files are loaded, provides methods to query permissions and check overlaps. """ def __init__(self, json_dir='permissions'): self.json_dir = os.path.join(os.path.dirname(__file__), json_dir) # Always define keys needed by consumers, even if load_data hasn't run yet self.data = {'resources': {}, 'families': {}} self.resource_name_map = {} self.family_name_map = {} self.verb_set = {'inspect', 'read', 'use', 'manage'}
[docs] def load_data(self): logger.info(f'Loading reference data from directory: {self.json_dir}') self.data = {'resources': {}, 'families': {}} files_loaded = 0 # Store operations for all loaded files in new field (flat) self.data['operations'] = {} # New: Also store a grouped operations structure for API/source display (`operations_by_api`) self.data['operations_by_api'] = {} # Per-verb risk weights: each permission is scored by the verb it belongs to (exposure points) verb_risk = {'inspect': 1, 'read': 5, 'use': 50, 'manage': 100} for file_path in glob.glob(os.path.join(self.json_dir, '*.json')): logger.debug(f'Loading reference data file: {file_path}') try: with open(file_path) as f: file_data = json.load(f) debug_resources = file_data.get('resources', {}) debug_families = file_data.get('families', {}) debug_operations = file_data.get('operations', {}) # Inject risk score into resources-per-verb-permission for _res_name, resdata in debug_resources.items(): verbs = resdata.get('verbs', {}) for verb, perms in verbs.items(): risk = verb_risk.get(verb, 1) if not isinstance(perms, list): continue # Store per-resource dictionary so we can find the risk for a permission for perm in perms: resdata.setdefault('permission_risks', {})[perm.upper()] = risk logger.debug( f'File {file_path}: contains {len(debug_resources)} resources, {len(debug_families)} families, {len(debug_operations)} operations' ) self.data['resources'].update(debug_resources) self.data['families'].update(debug_families) # Determine api_name from filename (basename, no extension) api_name = os.path.splitext(os.path.basename(file_path))[0] if debug_operations: self.data['operations'].update(debug_operations) # group by api_name: {op_name: op_data + 'api_name': ...} ops = {} for op_name, meta in debug_operations.items(): meta_copy = dict(meta) # don't mutate input meta_copy['api_name'] = api_name ops[op_name] = meta_copy self.data['operations_by_api'][api_name] = ops logger.debug( f'File {file_path} loaded/merged. Cumulative resources: {len(self.data["resources"])}, families: {len(self.data["families"])}, operations: {len(self.data["operations"])}, operations_by_api: {len(self.data["operations_by_api"])}' ) files_loaded += 1 except Exception as e: logger.error(f'Error loading {file_path}: {e}') logger.info( f'Loaded {files_loaded} reference data files. Total resources: {len(self.data["resources"])}, families: {len(self.data["families"])}, operations: {len(self.data["operations"])}, operations_by_api: {len(self.data["operations_by_api"])}' ) # Create case-insensitive maps for resources and families self.resource_name_map = {k.lower(): k for k in self.data['resources'].keys()} self.family_name_map = {k.lower(): k for k in self.data['families'].keys()} self.verb_set = {'inspect', 'read', 'use', 'manage'}
[docs] def get_permission_risk(self, permission: str, resource: str = None): """ Get the risk score for a single permission string (optionally for a given resource). If resource is not provided, search all resources. This method is case-insensitive for permission and resource. """ perm = permission.upper() resource_key = None if resource: resource_key = self.resource_name_map.get(resource.lower()) if resource_key and resource_key in self.data['resources']: risk = self.data['resources'][resource_key].get('permission_risks', {}).get(perm) if risk is not None: return risk # Search all resources if resource not given or not found for resdata in self.data['resources'].values(): risk = resdata.get('permission_risks', {}).get(perm) if risk is not None: return risk return 1 # default fall-back risk score
[docs] def get_permissions_risk_sum(self, permissions, resource: str = None): """ Given a list of permissions (case-insensitive), compute the summed risk score. """ total = 0 for perm in permissions: total += self.get_permission_risk(perm, resource) return total
[docs] def get_verb_resource_risk(self, verb: str, resource: str): """ Get cumulative permission risk for all permissions associated with given verb/resource. """ perms = self.get_permissions(resource, verb) return self.get_permissions_risk_sum(perms, resource)
[docs] def get_permissions(self, entity, verb, action='allow'): """ Get cumulative permissions for a resource or family at a given verb level and action. For "allow": behavior is as before. For "deny": logic is inverted -- broader verbs (like 'inspect') deny more permissions. Special case: if entity == 'all-resources', gather permissions from *all* resources/types, but for the specified verb only. For 'allow', union the specific-verb permissions from all. For 'deny', union the same but these are what is DENIED. Args: entity (str): Resource name or family name (case-insensitive). verb (str): Verb level ('inspect', 'read', 'use', 'manage'), case-insensitive. action (str): "allow" or "deny" (default: "allow") Returns: list: List of cumulative permissions, always UPPERCASE. """ # Normalize inputs entity_ci = (entity or '').lower() verb_ci = (verb or '').lower() # Special entity: all-resources should follow the same cumulative # allow/deny semantics as individual resources and families. # We therefore aggregate the cumulative permissions from every # known resource instead of looking only at the single verb. if entity_ci == 'all-resources': all_perms = set() for res_name in self.data['resources'].keys(): perms = self._get_cumulative_permissions(res_name, verb_ci, action) if perms: all_perms.update(perms) return list(all_perms) # Handle families/resources with case-insensitive lookups if entity_ci in self.family_name_map: fam_key = self.family_name_map[entity_ci] all_perms = set() for res in self.data['families'][fam_key]['resources']: perms = self._get_cumulative_permissions(res, verb_ci, action) if perms: all_perms.update(perms) return [p.upper() for p in all_perms] else: # Resource lookup res_key = self.resource_name_map.get(entity_ci, entity) perms = self._get_cumulative_permissions(res_key, verb_ci, action) if perms: return [p.upper() for p in perms] return []
def _get_cumulative_permissions(self, resource, verb, action='allow'): # Case-insensitive resource and verb lookup resource_ci = (resource or '').lower() verb_ci = (verb or '').lower() resource_key = self.resource_name_map.get(resource_ci, resource) verbs_order = ['inspect', 'read', 'use', 'manage'] try: index = [v.lower() for v in verbs_order].index(verb_ci) except ValueError: return None if resource_key not in self.data['resources']: return None perms = [] if action == 'deny': # For deny, we deny verb and everything MORE powerful (up the privilege ladder) for v in [v.lower() for v in verbs_order][index:]: perms.extend(self.data['resources'][resource_key]['verbs'].get(v, [])) else: # For allow, we allow verb and everything LESS powerful for v in [v.lower() for v in verbs_order][: index + 1]: perms.extend(self.data['resources'][resource_key]['verbs'].get(v, [])) return list({p.upper() for p in perms})
[docs] def check_overlap(self, perm_set1, perm_set2): """ Check for overlapping permissions between two permission sets. Uses 2 lists of permissions. Always compares and returns upper case permissions (display, logic, and reporting). """ if not perm_set1 or not perm_set2: return [] overlap = {p.upper() for p in perm_set1} & {p.upper() for p in perm_set2} return list(overlap)
[docs] def check_overlap_params(self, entity1, verb1, action1, entity2, verb2, action2): """ Check overlapped permissions by specifying both sides as entity/verb/action. Args: entity1 (str), verb1 (str), action1 (str) entity2 (str), verb2 (str), action2 (str) Returns: list: List of overlapping permissions. """ perms1 = self.get_permissions(entity1, verb1, action1) perms2 = self.get_permissions(entity2, verb2, action2) return self.check_overlap(perms1, perms2)
[docs] def get_source(self, entity): """ Retrieve the source URL(s) for a given entity (resource or family) in a case-insensitive manner. """ sources = set() entity_ci = (entity or '').lower() fam_key = self.family_name_map.get(entity_ci) if fam_key and fam_key in self.data['families']: source_url = self.data['families'][fam_key].get('source_url', '') if source_url: sources.add(source_url) else: res_key = self.resource_name_map.get(entity_ci) if res_key: for _fam, fam_data in self.data['families'].items(): if res_key in fam_data['resources']: source_url = fam_data.get('source_url', '') if source_url: sources.add(source_url) return ', '.join(sources) if sources else ''
[docs] def has_api_operation_permissions(self, operation_name, granted_permissions): """ Check if all required permissions for the given API operation are present in the granted_permissions list. Args: operation_name (str): Name of the API operation (as in 'operations' node). granted_permissions (list[str]): List of permission strings to check. Returns: bool: True if all required permissions for the operation are present, False otherwise. """ # Find operation (case-sensitive key match) op_info = self.data.get('operations', {}).get(operation_name) if not op_info: logger.debug(f'API operation {operation_name!r} not found in reference data.') return False required = {p.upper() for p in op_info.get('permissions', [])} provided = {p.upper() for p in granted_permissions} missing = required - provided logger.debug( f'Checking permissions for op={operation_name!r}; required={required}, provided={provided}, missing={missing}' ) return not missing