##########################################################################
# Copyright (c) 2024, Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
#
# DISCLAIMER This is not an official Oracle application, It does not supported by Oracle Support.
#
# cli.py
#
# @author: Andrew Gregory
#
# Supports Python 3.12 and above
#
# coding: utf-8
##########################################################################
import argparse
import time
import warnings
from oci_policy_analysis.common.caching import CacheManager
from oci_policy_analysis.common.helpers import (
for_display_policy,
)
from oci_policy_analysis.common.logger import get_logger, set_log_level
from oci_policy_analysis.common.models import PolicySearch
from oci_policy_analysis.logic.data_repo import PolicyAnalysisRepository
from oci_policy_analysis.logic.policy_intelligence import PolicyIntelligenceEngine
# Suppress DeprecationWarnings from libraries
warnings.filterwarnings('ignore', category=DeprecationWarning)
[docs]
def main(): # noqa: C901
"""
Entry point for the OCI Policy and Dynamic Group Viewer CLI.
Parses command-line arguments to load, filter, display, or export OCI identity and policy information
from Oracle Cloud Infrastructure (OCI) using cached or live data.
Parameters
----------
--verbose : bool
Enable verbose logging.
--app-log : bool
Log output to app.log instead of console.
--instance-principal : bool
Use instance principal authentication.
--get-caches : str
List available caches for the given tenancy.
--print-all : bool
Print all policies and dynamic groups.
--recursive : bool
Recursively load policies across all compartments.
--use-cache : str, optional
Load data from a specified combined cache file.
--dont-save-cache-after-load : bool
Prevent saving a new combined cache after loading from OCI.
--profile : str
OCI CLI profile to use (default ``DEFAULT``).
--filter-json : str, optional
A JSON filter expression for policies.
--export-json : str, optional
Write collected data to a JSON file.
Usage Examples:
To print all policies and dynamic groups using the ADMIN profile with verbose logging:
``python -m oci_policy_analysis.cli --verbose --profile ADMIN --print-all``
To list available caches for a tenancy named "example-tenancy":
``python -m oci_policy_analysis.cli --get-caches example-tenancy``
To load data from a specific cache and export to JSON:
``python -m oci_policy_analysis.cli --use-cache 2024-11-17T10-43-08+00-00``
To filter policies with a JSON expression:
``python -m oci_policy_analysis.cli --filter-json '{"Subject": "group1", "Verb": "read"}'``
Returns:
None. Provides console output and/or writes files as specified.
"""
parser = argparse.ArgumentParser(description='OCI Policy and Dynamic Group Viewer CLI')
parser.add_argument('--verbose', action='store_true', help='Enable verbose logging')
parser.add_argument('--app-log', action='store_true', help='Enable app.log for logging (default is console)')
parser.add_argument('--instance-principal', action='store_true', help='Use instance principal authentication')
parser.add_argument('--get-caches', help='If set, provide the names of tenancy to search for caches')
parser.add_argument('--print-all', help='Print all of the policies and DGs to screen', action='store_true')
parser.add_argument(
'--recursive', help='Recursive Load across all compartments', action='store_true', default=False
)
parser.add_argument('--use-cache', help='provide the combined cache date to use', required=False, default=None)
parser.add_argument(
'--load-from-compliance',
help='provide the directory containing compliance output CSVs',
required=False,
default=None,
)
parser.add_argument(
'--dont-save-cache-after-load',
help='Do not save the combined cache after loading from OCI',
action='store_true',
)
parser.add_argument('--profile', default='DEFAULT', help='OCI CLI profile to use (default: DEFAULT)')
parser.add_argument('--filter-json', help='JSON string with filter criteria', type=str, default=None)
parser.add_argument(
'--export-json',
help='Export everything that is collected (Policies, Dynamic Groups, Users, Groups, Cross-tenancy Policies) to provided JSON file',
)
args = parser.parse_args()
# Logging and Console setup
if args.app_log:
# Reconfigure logger to use console
logger = get_logger(component='cli')
logger.info('Logging to app.log')
else:
logger = get_logger(component='cli')
logger.info('Logging to Console')
# Configure logging based on verbose flag
if args.verbose:
set_log_level('DEBUG')
# logger.setLevel('DEBUG')
logger.debug('Verbose logging enabled')
# Initialize PolicyCompartmentAnalysis
policy_analysis = PolicyAnalysisRepository()
cache_manager = CacheManager()
# 1. Load from compliance CSVs if requested
if args.load_from_compliance:
logger.info(f'[CLI] Loading compliance output from directory: {args.load_from_compliance}')
result = policy_analysis.load_from_compliance_output_dir(args.load_from_compliance)
logger.info(f'[CLI] Compliance output load success: {result}')
logger.info(
f'[CLI] Entities loaded: {len(policy_analysis.identity_domains)} identity_domains, '
f'{len(policy_analysis.dynamic_groups)} dynamic_groups, {len(policy_analysis.users)} users, '
f'{len(policy_analysis.groups)} groups, {len(policy_analysis.compartments)} compartments, '
f'{len(policy_analysis.regular_statements)} policy statements'
)
# ---- Policy Intelligence step (CLI) ----
logger.info('[CLI] Running minimal post-load policy intelligence')
t0 = time.perf_counter()
try:
policy_intel = PolicyIntelligenceEngine(policy_analysis)
policy_intel.calculate_all_effective_compartments()
policy_intel.find_invalid_statements()
policy_intel.run_dg_in_use_analysis()
except Exception as exc:
logger.warning(f'[CLI] Post-load policy intelligence raised exception: {exc}')
t1 = time.perf_counter()
logger.info(f'[CLI] Post-load policy intelligence completed in {t1 - t0:.2f}s')
# ----------------------------------------
else:
# Just show caches and quit
if args.get_caches: # If get_caches is provided, list available caches
available_caches = cache_manager.get_available_cache(tenancy_name=args.get_caches)
if available_caches:
logger.info('Available caches:')
for cache in available_caches:
logger.info(cache)
else:
logger.info('No caches available.')
logger.info('Exiting after listing caches as --get-caches was provided')
exit(0)
# Load everything from named cache
if args.use_cache:
if not cache_manager.load_combined_cache(named_cache=args.use_cache, policy_analysis=policy_analysis):
logger.error('Failed to load combined cache')
exit(2)
else:
if not policy_analysis.initialize_client(
use_instance_principal=args.instance_principal,
profile=args.profile,
recursive=True if args.recursive else False,
):
logger.error('Failed to initialize PolicyCompartmentAnalysis client')
exit(2)
logger.info(f'Initialized PolicyCompartmentAnalysis client for tenancy: {policy_analysis.tenancy_name}')
if not policy_analysis.load_complete_identity_domains():
logger.error('Failed to load identity domains, groups, and users from OCI')
exit(2)
if not policy_analysis.load_policies_and_compartments():
logger.error('Failed to load policies and compartments from OCI')
exit(2)
# Completed the Load
logger.info(f'Loaded policies and compartments for tenancy: {policy_analysis.tenancy_name}')
if not args.dont_save_cache_after_load:
logger.info('Saving combined cache after loading from OCI')
cache_manager.save_combined_cache(policy_analysis=policy_analysis)
# ---- Policy Intelligence step (CLI) ----
logger.info('[CLI] Running minimal post-load policy intelligence')
t0 = time.perf_counter()
try:
policy_intel = PolicyIntelligenceEngine(policy_analysis)
policy_intel.calculate_all_effective_compartments()
policy_intel.find_invalid_statements()
policy_intel.run_dg_in_use_analysis()
except Exception as exc:
logger.warning(f'[CLI] Post-load policy intelligence raised exception: {exc}')
t1 = time.perf_counter()
logger.info(f'[CLI] Post-load policy intelligence completed in {t1 - t0:.2f}s')
# ----------------------------------------
# Print some basic details
logger.info('-' * 80)
logger.info(f'Tenancy Name: {policy_analysis.tenancy_name}')
logger.info(f'Tenancy OCID: {policy_analysis.tenancy_ocid}')
logger.info(f'Data As Of: {policy_analysis.data_as_of}')
logger.info('-' * 80)
# Apply JSON filter if provided
if args.filter_json:
filter: PolicySearch = eval(args.filter_json)
logger.info(f'Applying filter: {filter}')
# filtered_statements = policy_analysis.filter_policy_statements_json(filters=filter)
filtered_statements = policy_analysis.filter_policy_statements(filters=filter)
logger.info(f'Filtered down to {len(filtered_statements)} policy statements:')
for i, stmt in enumerate(filtered_statements, start=1):
stmt = for_display_policy(stmt)
logger.info(f'{i} Policy Name: {stmt.get("Policy Name")} | Statement: {stmt.get("Statement Text")}')
logger.info('-' * 80)
# Export JSON if needed
if args.export_json:
logger.info(f'The file is called {args.export_json}')
# Save to provided file
file_object = open(args.export_json, 'w')
cache_file_name = cache_manager.save_combined_cache(export_file=file_object, policy_analysis=policy_analysis)
logger.info(f'Wrote combined cache to {cache_file_name}')
logger.info('-' * 80)
elif args.print_all:
# Print regular policies
logger.info('\nRegular Policies:')
# Use helper to print nicely
for stmt in policy_analysis.regular_statements:
stmt = for_display_policy(stmt)
logger.info(f'Policy Name: {stmt.get("Policy Name")}')
logger.info(f'Statement: {stmt.get("Statement Text")}')
logger.info(f'Compartment Hierarchy: {stmt.get("Policy Compartment")}')
if stmt.get('parsed'):
logger.info(f'Subject Type: {stmt.get("Subject Type")}')
logger.info(f'Subject: {stmt.get("Subject")}')
logger.info(f'Verb: {stmt.get("Verb")}')
logger.info(f'Resource: {stmt.get("Resource")}')
logger.info(f'Permission: {stmt.get("Permission")}')
logger.info(f'Location Type: {stmt.get("Location Type")}')
logger.info(f'Location: {stmt.get("Location")}')
logger.info(f'Condition: {stmt.get("Condition")}')
logger.info(f'Comment: {stmt.get("Comment")}')
logger.info(f'Created: {stmt.get("Creation Time")}')
else:
logger.info('Statement could not be parsed into components')
logger.info('-' * 80)
# Print cross-tenancy policies
logger.info('\nCross-Tenancy Policies:')
logger.info('-' * 80)
for stmt in policy_analysis.cross_tenancy_statements:
stmt = for_display_policy(stmt)
logger.info(f'Policy Name: {stmt.get("Policy Name")}')
logger.info(f'Statement: {stmt.get("Statement Text")}')
logger.info(f'Compartment Hierarchy: {stmt.get("Policy Compartment")}')
logger.info('-' * 80)
# Print dynamic groups
logger.info('\nDynamic Groups:')
logger.info('-' * 80)
for dg in policy_analysis.dynamic_groups:
# for_display_dynamic_group expects a class or TypedDict, but compliance loads produce plain dicts
# Defensive: pass only keys that exist in both
safe_keys = [
'domain_name',
'dynamic_group_name',
'description',
'matching_rule',
'in_use',
'dynamic_group_ocid',
'creation_time',
]
dg_struct = {k: dg.get(k) for k in safe_keys}
logger.info(f'Domain: {dg_struct.get("domain_name")}')
logger.info(f'Dynamic Group Name: {dg_struct.get("dynamic_group_name")}')
logger.info(f'Description: {dg_struct.get("description")}')
logger.info(f'Matching Rule: {dg_struct.get("matching_rule")}')
logger.info(f'In Use: {dg_struct.get("in_use")}')
logger.info(f'OCID: {dg_struct.get("dynamic_group_ocid")}')
logger.info(f'Created: {dg_struct.get("creation_time")}')
logger.info('-' * 80)
# Print summary counts
# Summary
logger.info('-' * 80)
logger.info(f'Total Regular Policies: {len(policy_analysis.regular_statements)}')
logger.info(f'Total Cross-Tenancy Policies: {len(policy_analysis.cross_tenancy_statements)}')
logger.info(f'Total Dynamic Groups: {len(policy_analysis.dynamic_groups)}')
logger.info(f'Total Identity Domains: {len(policy_analysis.identity_domains)}')
logger.info(f'Total Groups: {len(policy_analysis.groups)}')
logger.info(f'Total Users: {len(policy_analysis.users)}')
if __name__ == '__main__':
main()