Source code for ibm_watson_openscale.custom_monitor

# coding: utf-8

# Copyright 2025 IBM All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Optional

from ibm_cloud_sdk_core import BaseService
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import (
    ApplicabilitySelection, IntegratedSystems, MetricThreshold,
    MonitorInstanceSchedule, MonitorMetricRequest, MonitorRuntime,
    ScheduleStartTime, Target, MonitorTagRequest)
from ibm_watson_openscale.supporting_classes.enums import (
    MetricThresholdTypes, TargetTypes)
from ibm_watson_openscale.utils.utils import validate_type
from ibm_watson_openscale.mrm import ModelRiskManagement


[docs] class SetupCustomMonitor: def __init__(self, ai_client: BaseService, service_url: str) -> None: validate_type(ai_client, 'ai_client', BaseService, True) self._ai_client = ai_client self.service_url = service_url self.results = {} self.config = {} logging.info("SetupCustomMonitor instance initialized.")
[docs] def setup_configuration(self, config, function_code): try: from ibm_watsonx_ai import APIClient except Exception as e: logging.error(f"ibm_watsonx_ai package is not available, Please install the latest version ") raise e logging.basicConfig(level=logging.INFO) CLOUD_API_KEY = config.get('CLOUD_API_KEY') is_cpd = "CPD_INFO" in config is_aws = config.get("AWS_AUTH_ENABLED", False) if is_cpd: cpd_info = config.get("CPD_INFO") if not cpd_info: raise ValueError("Missing 'CPD_INFO' in configuration.") cpd_url = cpd_info.get("CPD_URL") if cpd_info.get("CPD_URL") else config.get("OPENSCALE_API_URL") cpd_password = cpd_info.get("API_KEY") if cpd_info.get("API_KEY") else config.get("CLOUD_API_KEY") cpd_username = cpd_info.get("USERNAME") cpd_version = cpd_info.get("VERSION") fields = { "CPD_URL": cpd_url, "USERNAME": cpd_username, "API_KEY": cpd_password, "VERSION": cpd_version } missing_fields = [name for name, value in fields.items() if not value] if missing_fields: raise ValueError(f"Missing required CPD configuration fields: {', '.join(missing_fields)}") from urllib.parse import urlparse if not urlparse(cpd_url).scheme: raise ValueError("Invalid CPD_URL: Must be a valid URL.") cpd_config = { "token_info": { "url": "{}/icp4d-api/v1/authorize".format(cpd_url), "headers": { "Content-Type": "application/json", "Accept": "application/json" }, "payload": { "username": cpd_username, "api_key": cpd_password, }, "method": "post" } } defaults = { "DEPLOYMENT_NAME": "Custom Metrics Provider Deployment", "PYTHON_FUNCTION_NAME": "Custom Metrics Provider Function", "WML_URL": "https://us-south.ml.cloud.ibm.com", "IAM_URL": "https://iam.cloud.ibm.com/oidc/token", "CUSTOM_METRICS_PROVIDER_NAME": "Custom Metrics Provider", "CUSTOM_MONITOR_NAME": "Sample Model Performance", "OPENSCALE_API_URL": "https://api.aiopenscale.cloud.ibm.com", "DATAMART_ID": "00000000-0000-0000-0000-000000000000", "CUSTOM_METRICS_WAIT_TIME": 60, "DEPLOYMENT_TYPE": "wml_online", "RUNTIME_ENV": "runtime-25.1-py3.12", "ENABLE_SCHEDULE": False, "SCHEDULE" : { "repeat_interval": 1, "repeat_type": "hour", "delay_unit": "minute", "delay_time": 5 }, "DELETE_CUSTOM_MONITOR": True, "DELETE_INTEGRATED_SYSTEM": True, "DELETE_CUSTOM_MONITOR_INSTANCE": True, "INPUT_DATA_TYPES": ["structured","unstructured_text","unstructured_image","chat"], "ALGORITHM_TYPES": ["binary", "regression", "multiclass", "question_answering", "summarization", "retrieval_augmented_generation", "classification", "generation", "extraction"], "TAGS": [ { "name": "region", "TAG_DESCRIPTION": "customer geographical region" } ], "CUSTOM_METRICS_PROVIDER_CREDENTIALS" : { "auth_type":"bearer" }, "token_info": { "url": "https://iam.cloud.ibm.com/identity/token", "headers": { "Content-type": "application/x-www-form-urlencoded" }, "payload": "grant_type=urn:ibm:params:oauth:grant-type:apikey&response_type=cloud_iam&apikey="+ CLOUD_API_KEY, "method": "POST" } } config = {**defaults, **config} if is_cpd: creds = config.setdefault("CUSTOM_METRICS_PROVIDER_CREDENTIALS", {}) token_info = creds.setdefault("token_info", cpd_config.get("token_info", {})) config["WML_URL"] = config["OPENSCALE_API_URL"] token_info.setdefault("url", f"{config['OPENSCALE_API_URL']}/icp4d-api/v1/authorize") payload = token_info.setdefault("payload", {}) payload.setdefault("api_key", config.get("CLOUD_API_KEY")) elif is_aws: # Configure AWS/MCSP token generation for custom metrics provider # The MCSP token endpoint expects a JSON payload and returns {"token": "..."} # token_field_name tells the OpenScale backend which field in the response contains the token aws_iam_url = config.get("AWS_IAM_URL", "https://account-iam.platform.saas.ibm.com/api/2.0/apikeys/token") creds = config.setdefault("CUSTOM_METRICS_PROVIDER_CREDENTIALS", {}) creds["auth_type"] = "bearer" creds["token_info"] = { "url": aws_iam_url, "headers": {"Content-type": "application/json"}, "payload": {"apikey": config.get("CLOUD_API_KEY")}, "method": "POST", "token_field_name": "token" } # Override WML URL with AWS-specific URL if provided if config.get("AWS_WML_URL"): config["WML_URL"] = config["AWS_WML_URL"] if config.get("AWS_OPENSCALE_API_URL"): config["OPENSCALE_API_URL"] = config["AWS_OPENSCALE_API_URL"] else: if "token_info" not in config.get("CUSTOM_METRICS_PROVIDER_CREDENTIALS", {}): config["CUSTOM_METRICS_PROVIDER_CREDENTIALS"]["token_info"] = config["token_info"] if config.get("DEPLOYMENT_NAME") and config.get("SUBSCRIPTION_ID"): config["DEPLOYMENT_NAME_PREFIX"] = str(config.get("DEPLOYMENT_NAME")) + "_" + str(config.get("SUBSCRIPTION_ID")) config["PYTHON_FUNCTION_NAME"] = str(config.get("PYTHON_FUNCTION_NAME")) + "_" + str(config.get("SUBSCRIPTION_ID")) else: config["DEPLOYMENT_NAME_PREFIX"] = config.get("DEPLOYMENT_NAME") try: self.config = config if 'SPACE_ID' not in config or not config['SPACE_ID']: raise ValueError("'SPACE_ID' is missing in the configuration") print("Initialising Watson Machine Learning (WML) client.") if is_cpd: print("Initilising CPD WML") credentials = { "url": cpd_url, "username": cpd_username, "apikey": cpd_password, "instance_id": "openshift", "version": cpd_version } wml_client = APIClient(credentials) elif is_aws: print("Initilising AWS WML") from ibm_watsonx_ai import Credentials as WatsonxCredentials aws_wml_url = config.get("WML_URL") aws_platform_url = config.get("AWS_PLATFORM_URL", config.get("AWS_OPENSCALE_API_URL")) wml_credentials = WatsonxCredentials( url=aws_wml_url, api_key=config["CLOUD_API_KEY"], platform_url=aws_platform_url ) wml_client = APIClient(credentials=wml_credentials) else: print("Initilising Cloud WML") wml_client = APIClient({ "url": config["WML_URL"], "apikey": config["CLOUD_API_KEY"] }) wml_client.set.default_space(config['SPACE_ID']) print(f"Default space set to {config['SPACE_ID']}") print("Setting up an Integration System for Custom Metrics Provider") integrated_system_id = self._setup_integrated_system(config, wml_client,function_code) print(integrated_system_id) # create_custom_monitor print("Creating custom monitor.") custom_monitor_id = self._create_custom_monitor(config) result = self._associate_integrated_system(integrated_system_id, custom_monitor_id) # _create_monitor_instance print("Creating monitor instance.") monitor_instance_id = self._create_monitor_instance(config, custom_monitor_id, integrated_system_id) print(monitor_instance_id) # Create custom dataset for record-level metrics print("Creating custom dataset for record-level metrics.") custom_dataset_info = self.create_custom_dataset( data_mart_id=config['DATAMART_ID'], subscription_id=config['SUBSCRIPTION_ID'], custom_monitor_id=custom_monitor_id ) print(f"Custom dataset created with ID: {custom_dataset_info['custom_dataset_id']}") logging.info("Custom monitor setup completed successfully") return self.results except Exception as e: logging.error(f"Setup failed: {str(e)}") raise e
def _cleanup_existing_deployment(self, wml_client, deployment_name): deployment_type = self.config.get("DEPLOYMENT_TYPE","wml_online").lower() if deployment_type == "wml_online": try: print(f"Performing wml_online deployment Cleanup for: {deployment_name}") deployments = wml_client.deployments.get_details() for deployment in deployments["resources"]: if deployment["metadata"]["name"] == deployment_name: deployment_id = deployment["metadata"]["id"] print(f"Deleting wml_online deployment: {deployment_id} ") wml_client.deployments.delete(deployment_id) asset_id = deployment["entity"].get("asset", {}).get("id") if asset_id: print(f"Deleting associated asset: {asset_id}") wml_client.repository.delete(asset_id) except Exception as e: print(f"Error during wml_online deployment clenaup {deployment_name} : {e}") elif deployment_type == "wml_batch": try: print(f"Performing Batch deployment Cleanup for: {deployment_name}") deployments = wml_client.deployments.get_details() for deployment in deployments["resources"]: if deployment["metadata"]["name"] == deployment_name or deployment["metadata"]["name"] == deployment_name + '_WRAPPER': deployment_id = deployment["metadata"]["id"] print(f"Deleting Batch deployment: {deployment_id} ") wml_client.deployments.delete(deployment_id) asset_id = deployment["entity"].get("asset", {}).get("id") if asset_id: print(f"Deleting associated asset: {asset_id}") wml_client.repository.delete(asset_id) except Exception as e: print(f"Error during Batch deployment clenaup {deployment_name} : {e}") def _create_function(self, wml_client, config, code): deployment_type = self.config.get("DEPLOYMENT_TYPE","wml_online").lower() software_spec_id = wml_client.software_specifications.get_id_by_name( config['RUNTIME_ENV']) try: function_props = { wml_client.repository.FunctionMetaNames.NAME: config['PYTHON_FUNCTION_NAME'], wml_client.repository.FunctionMetaNames.SOFTWARE_SPEC_ID: software_spec_id } function_artifact = wml_client.repository.store_function( meta_props=function_props, function=code) function_id = wml_client.repository.get_function_id(function_artifact) self.results["function_id"] = function_id return function_id except Exception as e: print(f"Error during create function of deployment type {deployment_type} : {e}") def _deploy_function(self, wml_client, config, function_id): deployment_type = self.config.get("DEPLOYMENT_TYPE","wml_online").lower() try: self.hardware_spec_id = wml_client.hardware_specifications.get_id_by_name('M') if deployment_type == "wml_online": print(f"Deploy function as ONLINE : {config['DEPLOYMENT_NAME']}") deploy_meta = { wml_client.deployments.ConfigurationMetaNames.NAME: config["DEPLOYMENT_NAME_PREFIX"], wml_client.deployments.ConfigurationMetaNames.ONLINE: {}, wml_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": self.hardware_spec_id} } elif deployment_type == "wml_batch": print(f"Deploy function as BATCH : {config['DEPLOYMENT_NAME']}") deploy_meta = { wml_client.deployments.ConfigurationMetaNames.NAME: config["DEPLOYMENT_NAME_PREFIX"], wml_client.deployments.ConfigurationMetaNames.BATCH: {}, wml_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: { "name": "S", "num_nodes": 1} } deployment = wml_client.deployments.create(function_id, meta_props=deploy_meta) deployment_id = wml_client.deployments.get_uid(deployment) deployment_details = wml_client.deployments.get_details(deployment_id) created_at = deployment_details['metadata']['created_at'] current_date = created_at.split("T")[0] if "T" in created_at else "" if deployment_type == "wml_online": scoring_url = wml_client.deployments.get_scoring_href(deployment) scoring_url += f"?version={current_date}" elif deployment_type == "wml_batch": scoring_url = config['WML_URL'] + '/ml/v4/deployment_jobs?version='+current_date self.results.update({"deployment_id": deployment_id, "scoring_url": scoring_url}) return deployment_id, scoring_url except Exception as e: print(f"Error during Deploy function of deployment type {deployment_type} : {e}") def _setup_integrated_system(self, config, wml_client, function_code): # Create a unique name by appending subscription ID to the custom metrics provider name unique_provider_name = f"{config['CUSTOM_METRICS_PROVIDER_NAME']}_{config['SUBSCRIPTION_ID']}" print("CUSTOM_METRICS_PROVIDER_NAME:", unique_provider_name) scoring_url = None # Cleanup existing systems systems = IntegratedSystems(self._ai_client).list().result.integrated_systems for system in systems: if system.entity.name == unique_provider_name: if config['DELETE_INTEGRATED_SYSTEM']: print("delete_integrated_system is True") print(f"Cleaning up existing deployment {config['DEPLOYMENT_NAME']}.") self._cleanup_existing_deployment( wml_client, config["DEPLOYMENT_NAME_PREFIX"]) # Function Creation print("Creating custom function.") function_id = self._create_function( wml_client, config, function_code) # Deployment self.deployment_id, scoring_url = self._deploy_function( wml_client, config, function_id) IntegratedSystems(self._ai_client).delete( integrated_system_id=system.metadata.id) else: print("delete_integrated_system is False") self.results["integrated_system_id"] = system.metadata.id self.results["custom_metrics_provider_name"] = system.entity.name return system.metadata.id if scoring_url is None: # if you delete it from UI # Function Creation print("Creating custom function.") function_id = self._create_function( wml_client, config, function_code) # Deployment self.deployment_id, scoring_url = self._deploy_function( wml_client, config, function_id) # Create new integrated system with unique name system = self._ai_client.integrated_systems.add( name=unique_provider_name, description="Custom metrics provider system", type="custom_metrics_provider", credentials=config['CUSTOM_METRICS_PROVIDER_CREDENTIALS'], connection={ "display_name": unique_provider_name, "endpoint": scoring_url } ).result self.results["integrated_system_id"] = system.metadata.id self.results["custom_metrics_provider_name"] = system.entity.name return system.metadata.id def _create_custom_monitor(self, config): is_schedule = "SCHEDULE" in config if is_schedule: schedule_info = config["SCHEDULE"] REPEAT_INTERVAL = schedule_info.get("REPEAT_INTERVAL") or schedule_info.get("repeat_interval") REPEAT_TYPE = schedule_info.get("REPEAT_TYPE") or schedule_info.get("repeat_type") delay_unit = schedule_info.get("DELAY_UNIT") or schedule_info.get("delay_unit") delay_time = schedule_info.get("DELAY_TIME") or schedule_info.get("delay_time") # Step 1: Check if monitor already exists existing_monitors = self._ai_client.monitor_definitions.list().result.monitor_definitions for monitor in existing_monitors: if monitor.entity.name == config['CUSTOM_MONITOR_NAME']: if config.get('DELETE_CUSTOM_MONITOR', False): print(f"Deleting existing monitor: {monitor.entity.name}") # Delete associated custom dataset first self.delete_custom_dataset(monitor.metadata.id, config['SUBSCRIPTION_ID']) # Then delete the monitor self._ai_client.monitor_definitions.delete(monitor.metadata.id,background_mode=False) else: print(f"Reusing existing monitor: {monitor.entity.name}") self.results["custom_monitor_id"] = monitor.metadata.id return monitor.metadata.id # Step 2: Set applicability problem_type_selection = ApplicabilitySelection(problem_type=config['ALGORITHM_TYPES']) input_data_type_selection = ApplicabilitySelection(input_data_type=config['INPUT_DATA_TYPES']) # Step 3: Dynamically build metric definitions metrics = [] for metric_cfg in config.get('MONITOR_METRICS', []): metric_name = metric_cfg.get('name') thresholds = [] threshold_cfg = metric_cfg.get('thresholds', {}) for threshold_type, value in threshold_cfg.items(): if threshold_type.lower() == 'lower_limit': thresholds.append(MetricThreshold( type=MetricThresholdTypes.LOWER_LIMIT, default=value )) elif threshold_type.lower() == 'upper_limit': thresholds.append(MetricThreshold( type=MetricThresholdTypes.UPPER_LIMIT, default=value )) else: print(f"Warning: Unknown threshold type '{threshold_type}' for metric '{metric_name}'") metrics.append(MonitorMetricRequest( name=metric_name, applies_to=problem_type_selection, thresholds=thresholds )) # Step 4: Dynamically build tag definitions tags = [] tag_configurations = config.get('TAGS', []) for tag in tag_configurations: name = tag.get('name') description = tag.get('TAG_DESCRIPTION') # Validate if both name and description exist if name and description: tags.append(MonitorTagRequest(name=name, description=description)) else: print(f"Invalid tag configuration: {tag}") # Step 5: Optional scheduling if config.get('ENABLE_SCHEDULE', False): schedule = MonitorInstanceSchedule( repeat_interval=REPEAT_INTERVAL, repeat_unit=REPEAT_TYPE, start_time=ScheduleStartTime( type="relative", delay_unit=delay_unit, delay=delay_time) ) monitor_runtime = MonitorRuntime(type="custom_metrics_provider") else: schedule = None monitor_runtime = None # Step 6: Create the monitor monitor_def = self._ai_client.monitor_definitions.add( name=config['CUSTOM_MONITOR_NAME'], metrics=metrics, tags=tags, schedule=schedule, applies_to=input_data_type_selection, monitor_runtime=monitor_runtime, background_mode=False ).result # Step 7: Save and return monitor ID monitor_id = monitor_def.metadata.id self.results["custom_monitor_id"] = monitor_id print(f"Custom monitor created with ID: {monitor_id}") return monitor_id def _create_monitor_instance(self,config, custom_monitor_id, integrated_system_id): deployment_type = self.config.get("DEPLOYMENT_TYPE","wml_online").lower() existing_instances = self._ai_client.monitor_instances.list().result.monitor_instances monitor_instance_id = None try: for instance in existing_instances: if instance.entity.monitor_definition_id == custom_monitor_id: monitor_instance_id = instance.metadata.id if config.get("DELETE_CUSTOM_MONITOR_INSTANCE", False): print(f"Deleting existing monitor instance: {monitor_instance_id}") self._ai_client.monitor_instances.delete(monitor_instance_id,background_mode=False) monitor_instance_id = None break # Proceed to create a new one else: print(f"Updating existing monitor instance: {monitor_instance_id}") if deployment_type == "wml_online": patch_payload = [ { "op": "replace", "path": "/parameters", "value": { "custom_metrics_provider_id": integrated_system_id, "custom_metrics_wait_time": config["CUSTOM_METRICS_WAIT_TIME"], "enable_custom_metric_runs": True } } ] elif deployment_type == "wml_batch": custom_metrics_wait_time = config.get("custom_metrics_wait_time", 120) patch_payload = [ { "op": "replace", "path": "/parameters", "value": { "custom_metrics_provider_id": integrated_system_id, "custom_metrics_provider_type": "wml_batch", "custom_metrics_wait_time": config["CUSTOM_METRICS_WAIT_TIME"], "space_id": config['SPACE_ID'], "deployment_id": self.deployment_id, "hardware_spec_id": self.hardware_spec_id, "enable_custom_metric_runs": True } } ] self._ai_client.monitor_instances.update( monitor_instance_id, patch_payload, update_metadata_only=True ) self.results["custom_monitor_instance_id"] = monitor_instance_id return monitor_instance_id # No instance found or it was deleted — create a new one print(f"Creating new monitor instance for monitor definition: {custom_monitor_id}") target = Target( target_type=TargetTypes.SUBSCRIPTION, target_id=config["SUBSCRIPTION_ID"] ) if deployment_type == "wml_online": parameters = { "custom_metrics_provider_id": integrated_system_id, "custom_metrics_wait_time": config["CUSTOM_METRICS_WAIT_TIME"], "enable_custom_metric_runs": True } elif deployment_type == "wml_batch": parameters = { "custom_metrics_provider_id": integrated_system_id, "custom_metrics_provider_type": "wml_batch", "custom_metrics_wait_time": config["CUSTOM_METRICS_WAIT_TIME"], "space_id": config['SPACE_ID'], "deployment_id": self.deployment_id, "hardware_spec_id": self.hardware_spec_id, "enable_custom_metric_runs": True } instance = self._ai_client.monitor_instances.create( data_mart_id=config["DATAMART_ID"], background_mode=False, monitor_definition_id=custom_monitor_id, target=target, parameters=parameters ).result monitor_instance_id = instance.metadata.id print(f"Monitor instance created: {monitor_instance_id}") self.results["custom_monitor_instance_id"] = monitor_instance_id return monitor_instance_id except Exception as e: print(f"Error creating monitor instance : {e}") def _associate_integrated_system(self, integrated_system_id, custom_monitor_id): response = self._ai_client.integrated_systems.update(integrated_system_id, [{ "op": "add", "path": "/parameters", "value": {"monitor_definition_ids": [custom_monitor_id]} }]) result = response.result return result
[docs] def get_results(self): return self.results
[docs] def get_config(self): return self.config
[docs] def get_monitor_instance_config(self,config): target = config.get("SUBSCRIPTION_ID") monitor_name = config.get("CUSTOM_MONITOR_NAME","Sample Model Performance") existing_monitors = self._ai_client.monitor_definitions.list().result.monitor_definitions for monitor in existing_monitors: if monitor.entity.name == monitor_name: monitor_definition_id = monitor.metadata.id print(f"Monitor instance details picking for Subscription: {target} ,monitor definition: {monitor_definition_id} ") result = self._ai_client.monitor_instances.list(target_target_id=target, monitor_definition_id=monitor_definition_id).result.to_dict() return result
[docs] def get_custom_monitor_configuration(self,config): result = {} monitor_instance = self.get_results() if monitor_instance: result = monitor_instance else: res = self.get_monitor_instance_config(config=config) monitor_instances = res.get("monitor_instances") if res else None if monitor_instances and isinstance(monitor_instances, list) and monitor_instances and isinstance(monitor_instances[0], dict): instance = monitor_instances[0] metadata = instance.get("metadata", {}) entity = instance.get("entity", {}) parameters = entity.get("parameters", {}) result["custom_monitor_instance_id"] = metadata.get("id") result["custom_monitor_id"] = entity.get("monitor_definition_id") # Get custom_metrics_provider_id (used for both custom monitors and LLMaaJ) custom_metrics_provider_id = parameters.get("custom_metrics_provider_id") result["integrated_system_id"] = custom_metrics_provider_id # For LLMaaJ, also set llm_provider_id for consistency # Check if this is an LLMaaJ setup by looking for metrics_configuration metrics_config = parameters.get("metrics_configuration", {}) if metrics_config and metrics_config.get("llm_provider_id"): result["llm_provider_id"] = metrics_config.get("llm_provider_id") elif custom_metrics_provider_id: # If metrics_configuration exists but no llm_provider_id, use custom_metrics_provider_id result["llm_provider_id"] = custom_metrics_provider_id result["deployment_id"] = parameters.get("deployment_id") result["hardware_spec_id"] = parameters.get("hardware_spec_id") result["space_id"] = parameters.get("space_id") result["project_id"] = parameters.get("project_id") result["custom_metrics_provider_type"] = parameters.get("custom_metrics_provider_type") result["custom_metrics_wait_time"] = parameters.get("custom_metrics_wait_time") result["custom_metrics_provider_name"] = parameters.get("custom_metrics_provider_name") # Add LLMaaJ-specific fields if present if metrics_config: result["metrics_configuration"] = metrics_config result["min_records"] = parameters.get("min_records") result["max_records"] = parameters.get("max_records") # Add RAG-specific fields if present (for RAG task type) if parameters.get("context_fields"): result["context_fields"] = parameters.get("context_fields") if parameters.get("question_field"): result["question_field"] = parameters.get("question_field") if parameters.get("meta_fields"): result["meta_fields"] = parameters.get("meta_fields") if parameters.get("data_properties"): result["data_properties"] = parameters.get("data_properties") # Fetch custom_dataset_id by querying data_sets API custom_monitor_id = result.get("custom_monitor_id") subscription_id = config.get("SUBSCRIPTION_ID") if custom_monitor_id and subscription_id: try: # Search for custom dataset managed by this monitor data_sets = self._ai_client.data_sets.list( target_target_id=subscription_id, type='custom' ).result.data_sets table_name_suffix = f"{custom_monitor_id}_{subscription_id}" for dataset in data_sets: if hasattr(dataset.entity, 'location') and hasattr(dataset.entity.location, 'table_name'): if dataset.entity.location.table_name.endswith(table_name_suffix): result["custom_dataset_id"] = dataset.metadata.id result["custom_dataset_table_name"] = dataset.entity.location.table_name break except Exception as e: print(f"Warning: Could not fetch custom dataset info: {str(e)}") else: print("Monitor instance config is missing or improperly structured.") return result
[docs] def create_custom_dataset(self, data_mart_id: str, subscription_id: str, custom_monitor_id: str) -> dict: """ Create a custom dataset for storing record-level metrics for custom monitors. The custom dataset includes default columns (record_id, record_timestamp, run_id, computed_on, data_set_id) and follows the naming convention: {custom_monitor_id}_{subscription_id} :param str data_mart_id: Watson OpenScale DataMart GUID :param str subscription_id: ID of the subscription to be monitored :param str custom_monitor_id: Custom monitor definition ID :return: A dictionary containing custom dataset information including dataset_id :rtype: dict Example Usage: >>> custom_dataset_info = wos_client.custom_monitor.create_custom_dataset( data_mart_id='00000000-0000-0000-0000-000000000000', subscription_id='subscription_id_here', custom_monitor_id='custom_monitor_id_here' ) """ # Construct the dataset schema with required default columns def get_data_set_schema(): """ Constructs payload to create custom dataset for record-level metrics """ fields = [ { "metadata": { "modeling_role": "record-id", "primary_key": True }, "name": "record_id", "nullable": False, "type": "string" }, { "metadata": { "modeling_role": "record-timestamp" }, "name": "record_timestamp", "nullable": False, "type": "timestamp" }, { "metadata": { "columnInfo": { "columnIndex": "DEFAULT" } }, "name": "run_id", "nullable": False, "type": "string" }, { "metadata": {}, "name": "computed_on", "nullable": False, "type": "string" }, { "metadata": {}, "name": "data_set_id", "nullable": False, "type": "string" } ] data_schema = { "fields": fields, "type": "struct" } return data_schema # Define target and location for the custom dataset from ibm_watson_openscale.base_classes.watson_open_scale_v2 import Target from ibm_watson_openscale.supporting_classes.enums import TargetTypes target = Target( target_type=TargetTypes.SUBSCRIPTION, target_id=subscription_id ) # Naming convention: {custom_monitor_id}_{subscription_id} table_name = f"{custom_monitor_id}_{subscription_id}" location = { "table_name": table_name } # Create the custom dataset try: print(f"Creating custom dataset with table name: {table_name}") custom_dataset_response = self._ai_client.data_sets.add( target=target, name=custom_monitor_id, description=f"{custom_monitor_id} record-level metrics table", type='custom', data_schema=get_data_set_schema(), data_mart_id=data_mart_id, managed_by=custom_monitor_id, location=location, background_mode=True ) custom_dataset_id = custom_dataset_response.result.metadata.id custom_dataset_info = { "custom_dataset_id": custom_dataset_id, "table_name": table_name, "custom_monitor_id": custom_monitor_id, "subscription_id": subscription_id, "response": custom_dataset_response.result.to_dict() } print(f"Custom dataset created successfully with ID: {custom_dataset_id}") # Store in results for future reference self.results["custom_dataset_id"] = custom_dataset_id self.results["custom_dataset_table_name"] = table_name return custom_dataset_info except Exception as e: print(f"Error creating custom dataset: {str(e)}") raise e
[docs] def delete_custom_dataset(self, custom_monitor_id: Optional[str] = None, subscription_id: Optional[str] = None, custom_dataset_id: Optional[str] = None): """ Delete the custom dataset associated with a custom monitor. :param str custom_monitor_id: (Optional) Custom monitor definition ID. Required if custom_dataset_id is not provided. :param str subscription_id: (Optional) ID of the subscription. Required if custom_dataset_id is not provided. :param str custom_dataset_id: (Optional) Custom dataset ID. If provided, directly deletes the dataset without searching. Example usage: >>> # Delete by searching for the dataset >>> wos_client.custom_monitor.delete_custom_dataset( custom_monitor_id='custom_monitor_id_here', subscription_id='subscription_id_here' ) >>> # Delete directly using dataset ID (custom_monitor_id and subscription_id not required) >>> wos_client.custom_monitor.delete_custom_dataset( custom_dataset_id='dataset_id_here' ) """ try: # If custom_dataset_id is provided, directly delete the dataset if custom_dataset_id: print(f"Deleting custom dataset with ID: {custom_dataset_id}") self._ai_client.data_sets.delete(data_set_id=custom_dataset_id, force=True, background_mode=False) print(f"Custom dataset {custom_dataset_id} deleted successfully") return # Otherwise, search for the dataset by table name # Validate required parameters for search-based deletion if not custom_monitor_id or not subscription_id: raise ValueError("custom_monitor_id and subscription_id are required when custom_dataset_id is not provided") # Construct the expected table name based on naming convention table_name_suffix = f"{custom_monitor_id}_{subscription_id}" # Find and delete the dataset data_sets = self._ai_client.data_sets.list( target_target_id=subscription_id, type='custom' ).result.data_sets for dataset in data_sets: if hasattr(dataset.entity, 'location') and hasattr(dataset.entity.location, 'table_name'): # Use endswith to match table name suffix (handles datamart prefix) if dataset.entity.location.table_name.endswith(table_name_suffix): dataset_id = dataset.metadata.id print(f"Deleting custom dataset: {dataset_id} (table: {dataset.entity.location.table_name})") self._ai_client.data_sets.delete(data_set_id=dataset_id, force=True, background_mode=False) print(f"Custom dataset {dataset_id} deleted successfully") return print(f"No custom dataset found with table name suffix: {table_name_suffix}") except Exception as e: print(f"Warning: Error deleting custom dataset: {str(e)}")
[docs] def setup_llm_judge_configuration(self, config): """ Setup LLM-as-a-Judge (LLMaaJ) for custom monitors. This method follows the LLMaaJ flow: 1. Create monitor definition with LLM grader metrics 2. Create integrated system (generative_ai_evaluator) 3. Invoke prompt setup API with monitors payload 4. Poll for completion to get subscription_id and service_provider_id 5. Get monitor instance ID 6. Fetch custom_dataset_id for record level metrics :param dict config: Configuration with LLM_PROVIDER, LLM_PROVIDER_CONFIG, CUSTOM_MONITOR_CONFIG, PROMPT_TEMPLATE_ASSET_ID, LABEL_COLUMN, OPERATIONAL_SPACE_ID, PROBLEM_TYPE, INPUT_DATA_TYPE, DATAMART_ID, and optional parameters. :return: Dictionary with llm_provider_id, custom_monitor_id, subscription_id, service_provider_id, monitor_instance_id, custom_dataset_id, custom_dataset_table_name, etc. :rtype: dict """ logging.info("Starting LLMaaJ custom metrics setup") self._validate_llmaj_config(config) # Merge configurations merged_config = {**config, **config.get('CUSTOM_MONITOR_CONFIG', {})} self.config = merged_config # Step 1: Create monitor definition with LLM grader metrics (use LLMaaJ-specific method) custom_monitor_id = self.create_llmaj_monitor_definition(merged_config) logging.info(f"Created custom monitor definition: {custom_monitor_id}") # Step 2: Create integrated system (generative_ai_evaluator) provider_config = config['LLM_PROVIDER_CONFIG'] llm_provider = provider_config.get('type', '').lower() delete_llm_provider = merged_config.get('DELETE_LLM_PROVIDER', True) llm_provider_id = self.create_llm_provider( llm_provider, provider_config, delete_llm_provider ) logging.info(f"Created LLM provider: {llm_provider_id}") # Step 3: Invoke prompt setup API with monitors payload prompt_setup_response = self._invoke_prompt_setup( merged_config, custom_monitor_id, llm_provider_id ) # Step 4: Extract subscription_id and service_provider_id from response subscription_id = prompt_setup_response.get('subscription_id') service_provider_id = prompt_setup_response.get('service_provider_id') if not subscription_id: raise ValueError("subscription_id not found in prompt_setup response") logging.info(f"Prompt setup completed. Subscription ID: {subscription_id}, Service Provider ID: {service_provider_id}") # Step 5: Get monitor instance ID monitor_instance_id = self._get_monitor_instance_id(subscription_id, custom_monitor_id) logging.info(f"Monitor instance ID: {monitor_instance_id}") # Step 6: Fetch custom_dataset_id by querying data_sets API with retry custom_dataset_id = None custom_dataset_table_name = None import time max_retries = 5 retry_delay = 2 # seconds for retry in range(max_retries): try: # Search for custom dataset managed by this monitor data_sets = self._ai_client.data_sets.list( target_target_id=subscription_id, type='custom' ).result.data_sets table_name_suffix = f"{custom_monitor_id}_{subscription_id}" for dataset in data_sets: if hasattr(dataset.entity, 'location') and hasattr(dataset.entity.location, 'table_name'): if dataset.entity.location.table_name.endswith(table_name_suffix): custom_dataset_id = dataset.metadata.id custom_dataset_table_name = dataset.entity.location.table_name logging.info(f"Custom dataset ID found: {custom_dataset_id}") break if custom_dataset_id: break # Found the dataset, exit retry loop if retry < max_retries - 1: logging.info(f"Custom dataset not found yet, retrying in {retry_delay}s (attempt {retry + 1}/{max_retries})") time.sleep(retry_delay) else: logging.warning(f"Custom dataset not found after {max_retries} attempts") except Exception as e: if retry < max_retries - 1: logging.warning(f"Error fetching custom dataset (attempt {retry + 1}/{max_retries}): {str(e)}") time.sleep(retry_delay) else: logging.warning(f"Could not fetch custom dataset info after {max_retries} attempts: {str(e)}") # Step 7: Update monitor instance with custom_dataset_id if found if custom_dataset_id and monitor_instance_id: try: logging.info(f"Updating monitor instance {monitor_instance_id} with custom_dataset_id: {custom_dataset_id}") patch_document = [ { "op": "add", "path": "/parameters/custom_dataset_id", "value": custom_dataset_id } ] self._ai_client.monitor_instances.update( monitor_instance_id=monitor_instance_id, patch_document=patch_document ) logging.info("Monitor instance updated successfully with custom_dataset_id") except Exception as e: logging.warning(f"Failed to update monitor instance with custom_dataset_id: {str(e)}") # Return results self.results.update({ 'llm_provider': llm_provider, 'llm_provider_id': llm_provider_id, 'custom_monitor_id': custom_monitor_id, 'subscription_id': subscription_id, 'service_provider_id': service_provider_id, 'monitor_instance_id': monitor_instance_id, 'custom_dataset_id': custom_dataset_id, 'custom_dataset_table_name': custom_dataset_table_name, 'prompt_setup_response': prompt_setup_response }) logging.info("LLMaaJ custom metrics setup completed successfully") return self.results
[docs] def create_llmaj_monitor_definition(self, config): """ Create a monitor definition specifically for LLMaaJ. This method creates a monitor definition with LLM grader metrics including problem_type. :param dict config: Configuration dictionary :return: Monitor definition ID :rtype: str """ # Extract schedule configuration early (same pattern as _create_custom_monitor) is_schedule = "SCHEDULE" in config # Set default SCHEDULE if ENABLE_SCHEDULE is True but SCHEDULE is not configured if config.get('ENABLE_SCHEDULE') is True and not is_schedule: config["SCHEDULE"] = { "repeat_interval": 60, "repeat_type": "minute", "delay_unit": "minute", "delay_time": 5 } is_schedule = True logging.info("ENABLE_SCHEDULE is True but SCHEDULE not configured. Using default schedule values.") if is_schedule: schedule_info = config["SCHEDULE"] REPEAT_INTERVAL = schedule_info.get("REPEAT_INTERVAL") or schedule_info.get("repeat_interval") REPEAT_TYPE = schedule_info.get("REPEAT_TYPE") or schedule_info.get("repeat_type") delay_unit = schedule_info.get("DELAY_UNIT") or schedule_info.get("delay_unit") delay_time = schedule_info.get("DELAY_TIME") or schedule_info.get("delay_time") # Check if monitor already exists existing_monitors = self._ai_client.monitor_definitions.list().result.monitor_definitions for monitor in existing_monitors: if monitor.entity.name == config['CUSTOM_MONITOR_NAME']: if config.get('DELETE_CUSTOM_MONITOR', False): logging.info(f"Deleting existing monitor: {monitor.entity.name}") monitor_id = monitor.metadata.id # Before deleting the monitor, we must first delete associated monitor instances and custom datasets try: # Get all monitor instances for this monitor definition monitor_instances = self._ai_client.monitor_instances.list( monitor_definition_id=monitor_id ).result.monitor_instances for instance in monitor_instances: instance_id = instance.metadata.id subscription_id = instance.entity.target.target_id logging.info(f"Deleting monitor instance: {instance_id} for subscription: {subscription_id}") # Delete custom dataset first try: self.delete_custom_dataset(monitor_id, subscription_id) except Exception as e: logging.warning(f"Error deleting custom dataset: {str(e)}") # Then delete monitor instance try: self._ai_client.monitor_instances.delete(instance_id, background_mode=False) logging.info(f"Monitor instance {instance_id} deleted successfully") except Exception as e: logging.warning(f"Error deleting monitor instance: {str(e)}") except Exception as e: logging.warning(f"Error cleaning up monitor instances: {str(e)}") # Finally, delete the monitor definition with error handling try: self._ai_client.monitor_definitions.delete(monitor_id, background_mode=False) logging.info(f"Monitor definition {monitor_id} deleted successfully") except Exception as e: # If deletion fails due to network issues, log warning but continue # The monitor definition may have been deleted or is in process logging.warning(f"Error deleting monitor definition (may already be deleted): {str(e)}") else: logging.info(f"Reusing existing monitor: {monitor.entity.name}") self.results["custom_monitor_id"] = monitor.metadata.id return monitor.metadata.id # Set applicability - for LLMaaJ, use input_data_type and problem_type at monitor level only # Supported problem types for LLMaaJ supported_problem_types = [ 'question_answering', 'summarization', 'generation', 'extraction', 'retrieval_augmented_generation', 'classification' ] # If PROBLEM_TYPE is provided, use it; otherwise use all supported types if 'PROBLEM_TYPE' in config: problem_type = [config['PROBLEM_TYPE']] else: problem_type = supported_problem_types logging.info(f"PROBLEM_TYPE not provided, defaulting to all supported types: {supported_problem_types}") # Create monitor-level applicability with both input_data_type and problem_type monitor_level_applicability = ApplicabilitySelection( input_data_type=['unstructured_text'], problem_type=problem_type ) # Build metric definitions from MONITOR_METRICS # Note: applies_to should NOT be set on individual metrics - it's monitor-level only metrics = [] for metric_cfg in config.get('MONITOR_METRICS', []): metric_name = metric_cfg.get('name') metric_description = metric_cfg.get('description', '') # Extract computation details - they can be at root level or nested in 'computation' computation_cfg = metric_cfg.get('computation', {}) prompt = computation_cfg.get('prompt') or metric_cfg.get('prompt') grading_options = computation_cfg.get('grading_options') or metric_cfg.get('grading_options') # Extract and build thresholds thresholds = [] threshold_cfg = metric_cfg.get('thresholds', {}) for threshold_type, value in threshold_cfg.items(): if threshold_type.lower() == 'lower_limit': thresholds.append(MetricThreshold( type=MetricThresholdTypes.LOWER_LIMIT, default=value )) elif threshold_type.lower() == 'upper_limit': thresholds.append(MetricThreshold( type=MetricThresholdTypes.UPPER_LIMIT, default=value )) else: logging.warning(f"Unknown threshold type '{threshold_type}' for metric '{metric_name}'") # Build metric request for LLMaaJ evaluations # Do NOT include applies_to - it should only be at monitor level metric_request = MonitorMetricRequest( name=metric_name, description=metric_description, thresholds=thresholds ) # Add computation object with llm_grader type (required for LLMaaJ) # The computation object contains the prompt and grading_options if prompt is not None and grading_options is not None: # Import required classes from ibm_watson_openscale.base_classes.watson_open_scale_v2 import ( MetricComputationSchemaLLMGraderComputation, GradingOption ) # Convert grading_options dicts to GradingOption objects grading_option_objects = [] for opt in grading_options: if isinstance(opt, dict): grading_option_objects.append(GradingOption( name=opt.get('name'), value=opt.get('value'), description=opt.get('description') )) else: grading_option_objects.append(opt) # Create MetricComputationSchemaLLMGraderComputation object computation = MetricComputationSchemaLLMGraderComputation( type="llm_grader", prompt=prompt, grading_options=grading_option_objects ) metric_request.computation = computation metrics.append(metric_request) # Build tags - use default tag for LLMaaJ (required by API) tags = [MonitorTagRequest( name="computed_on", description="used to set the dataset type" )] logging.info(f"Creating monitor definition with {len(tags)} tags and {len(metrics)} metrics") # Handle schedule configuration (same pattern as _create_custom_monitor) if config.get('ENABLE_SCHEDULE', False) and is_schedule: schedule = MonitorInstanceSchedule( repeat_interval=REPEAT_INTERVAL, repeat_unit=REPEAT_TYPE, start_time=ScheduleStartTime( type="relative", delay_unit=delay_unit, delay=delay_time) ) logging.info(f"Schedule configured: {REPEAT_INTERVAL} {REPEAT_TYPE}, delay: {delay_time} {delay_unit}") else: schedule = None if config.get('ENABLE_SCHEDULE', False) and not is_schedule: logging.warning("ENABLE_SCHEDULE is True but SCHEDULE configuration is missing. Schedule will not be set.") # Create the monitor definition with monitor_runtime for LLMaaJ monitor_runtime = {"type": "llm_as_a_judge"} monitor_def = self._ai_client.monitor_definitions.add( name=config['CUSTOM_MONITOR_NAME'], metrics=metrics, tags=tags, applies_to=monitor_level_applicability, background_mode=False, monitor_runtime=monitor_runtime, schedule=schedule ).result monitor_id = monitor_def.metadata.id self.results["custom_monitor_id"] = monitor_id logging.info(f"LLMaaJ monitor definition created with ID: {monitor_id}") return monitor_id
def _get_monitor_instance_id(self, subscription_id, custom_monitor_id): """ Get monitor instance ID for a given subscription and custom monitor. :param str subscription_id: Subscription ID :param str custom_monitor_id: Custom monitor definition ID :return: Monitor instance ID or None if not found :rtype: str or None """ try: monitor_instances = self._ai_client.monitor_instances.list( target_target_id=subscription_id, monitor_definition_id=custom_monitor_id ).result.monitor_instances if monitor_instances and len(monitor_instances) > 0: return monitor_instances[0].metadata.id return None except Exception as e: logging.warning(f"Could not retrieve monitor instance ID: {str(e)}") return None def _validate_llmaj_config(self, config): """ Validate the LLMaaJ configuration. :param dict config: Configuration dictionary :raises ValueError: If required parameters are missing or invalid """ # Set default INPUT_DATA_TYPE for LLMaaJ if not provided if 'INPUT_DATA_TYPE' not in config or not config['INPUT_DATA_TYPE']: config['INPUT_DATA_TYPE'] = 'unstructured_text' logging.info("INPUT_DATA_TYPE not provided, defaulting to 'unstructured_text' for LLMaaJ") # Note: SUBSCRIPTION_ID is NOT required - it's created by prompt_setup API required_fields = [ 'LLM_PROVIDER_CONFIG', 'CUSTOM_MONITOR_CONFIG', 'PROMPT_TEMPLATE_ASSET_ID', 'LABEL_COLUMN', 'OPERATIONAL_SPACE_ID', 'PROBLEM_TYPE', 'DATAMART_ID' ] missing_fields = [field for field in required_fields if field not in config or not config[field]] if missing_fields: raise ValueError(f"Missing required configuration fields: {', '.join(missing_fields)}") # Validate provider-specific configuration provider_config = config.get('LLM_PROVIDER_CONFIG', {}) # Validate LLM provider type (now inside LLM_PROVIDER_CONFIG) llm_provider = provider_config.get('type', '').lower() if not llm_provider: raise ValueError("Missing 'type' field in LLM_PROVIDER_CONFIG") if llm_provider not in ['watsonx', 'watsonx.ai', 'openai']: raise ValueError(f"Invalid LLM provider type: {llm_provider}. Must be 'watsonx', 'watsonx.ai', or 'openai'") if llm_provider in ['watsonx', 'watsonx.ai']: # Check if CPD or Cloud deployment using wos_client.is_cp4d is_cpd = self._ai_client.is_cp4d if is_cpd: # CPD deployment: require url and either apikey OR (username + password) if 'url' not in provider_config or not provider_config['url']: raise ValueError("Missing required CPD field 'url' in LLM_PROVIDER_CONFIG") has_apikey = 'apikey' in provider_config or 'API_KEY' in provider_config has_username_password = (('CPD_USERNAME' in provider_config and 'CPD_PASSWORD' in provider_config) or ('username' in provider_config and 'password' in provider_config)) if not has_apikey and not has_username_password: raise ValueError("CPD deployment requires either 'apikey' OR ('CPD_USERNAME' + 'CPD_PASSWORD') in LLM_PROVIDER_CONFIG") else: # Cloud deployment: require apikey (or API_KEY) if 'apikey' not in provider_config and 'API_KEY' not in provider_config: raise ValueError("Cloud deployment requires 'apikey' (or 'API_KEY') in LLM_PROVIDER_CONFIG") else: # openai required_provider_fields = ['API_KEY', 'MODEL_NAME'] missing_provider_fields = [field for field in required_provider_fields if field not in provider_config or not provider_config[field]] if missing_provider_fields: raise ValueError(f"Missing required {llm_provider} provider fields: {', '.join(missing_provider_fields)}") # Validate custom monitor configuration monitor_config = config.get('CUSTOM_MONITOR_CONFIG', {}) # Set defaults for LLMaaJ-specific fields if not provided if 'INPUT_DATA_TYPES' not in monitor_config or not monitor_config['INPUT_DATA_TYPES']: monitor_config['INPUT_DATA_TYPES'] = ['unstructured_text'] logging.info("INPUT_DATA_TYPES not provided in CUSTOM_MONITOR_CONFIG, defaulting to ['unstructured_text'] for LLMaaJ") if 'ALGORITHM_TYPES' not in monitor_config or not monitor_config['ALGORITHM_TYPES']: # Supported problem types for LLMaaJ supported_problem_types = [ 'question_answering', 'summarization', 'generation', 'extraction', 'retrieval_augmented_generation', 'classification' ] # Use PROBLEM_TYPE from config if provided, otherwise use all supported types if 'PROBLEM_TYPE' in config: monitor_config['ALGORITHM_TYPES'] = [config['PROBLEM_TYPE']] logging.info(f"ALGORITHM_TYPES not provided in CUSTOM_MONITOR_CONFIG, using PROBLEM_TYPE: ['{config['PROBLEM_TYPE']}']") else: monitor_config['ALGORITHM_TYPES'] = supported_problem_types logging.info(f"ALGORITHM_TYPES not provided in CUSTOM_MONITOR_CONFIG, defaulting to all supported types: {supported_problem_types}") # MONITOR_METRICS is required - users must define their own metrics with proper grading_options required_monitor_fields = ['CUSTOM_MONITOR_NAME', 'MONITOR_METRICS'] missing_monitor_fields = [field for field in required_monitor_fields if field not in monitor_config or not monitor_config[field]] if missing_monitor_fields: raise ValueError(f"Missing required custom monitor fields: {', '.join(missing_monitor_fields)}")
[docs] def create_llm_provider(self, llm_provider_type, provider_config, delete_llm_provider=True): """ Create an integrated system for LLM evaluator (generative_ai_evaluator type). This matches the REST API flow where integrated system is created with type "generative_ai_evaluator". :param str llm_provider_type: Provider type ('watsonx', 'watsonx.ai', or 'openai') :param dict provider_config: Provider-specific configuration :param bool delete_llm_provider: Whether to delete existing LLM provider if found (default: True) :return: Integrated system ID :rtype: str """ # Normalize provider name is_cpd = self._ai_client.is_cp4d if llm_provider_type == 'watsonx.ai' or llm_provider_type == 'openai': provider_type = llm_provider_type llm_provider_type = 'watsonx' else: raise ValueError("The provider type should be either watsonx.ai or openai ") datamart_id = self._ai_client.service_instance_id provider_name = f"LLM_Evaluator_{llm_provider_type}_{datamart_id}" # Check if provider already exists existing_systems = IntegratedSystems(self._ai_client).list().result.integrated_systems for system in existing_systems: if system.entity.name == provider_name: if delete_llm_provider: logging.info(f"Deleting existing LLM provider: {provider_name}") IntegratedSystems(self._ai_client).delete(integrated_system_id=system.metadata.id) else: logging.info(f"Reusing existing LLM provider: {provider_name}") self.results["llm_provider_id"] = system.metadata.id self.results["llm_provider_name"] = system.entity.name return system.metadata.id # Build credentials based on wos_client.is_cp4d (following normal custom metrics pattern) credentials = {} url = None # Set wml_location wml_location = provider_config.get('WML_LOCATION') or provider_config.get('wml_location') if wml_location: credentials['wml_location'] = wml_location elif not wml_location and provider_type == 'watsonx.ai': raise ValueError("WML Location is required for watsonx.ai type ") # Set url url = provider_config.get('URL') or provider_config.get('url') if url: credentials['url'] = url if is_cpd: # CPD deployment: accept either apikey OR username+password # Based on CustomCredentials.AuthTypeEnum: valid values are 'basic' or 'api_key' # For CPD, use 'basic' auth_type with auth_provider='cpd' apikey = provider_config.get('API_KEY') or provider_config.get('apikey') username = provider_config.get('CPD_USERNAME') or provider_config.get('USERNAME') or provider_config.get('username') password = provider_config.get('CPD_PASSWORD') or provider_config.get('PASSWORD') or provider_config.get('password') if apikey and wml_location == 'cloud_remote' or provider_type == 'openai' : credentials['apikey'] = apikey elif apikey and username: # Use username+apikey for CPD with basic auth credentials['auth_type'] = 'basic' credentials['auth_provider'] = 'cpd' credentials['username'] = username credentials['apikey'] = apikey logging.info("Using username+apikey for CPD authentication with auth_type=basic") elif username and password: # Use username+password for CPD with basic auth credentials['auth_type'] = 'basic' credentials['auth_provider'] = 'cpd' credentials['username'] = username credentials['apikey'] = password credentials['password'] = password logging.info("Using username+password for CPD authentication with auth_type=basic") else: raise ValueError("CPD deployment requires either 'apikey' OR ('username' + 'password'/'apikey')") else: # Cloud deployment: use apikey with auth_type='api_key' (as per evaluators.py example) apikey = provider_config.get('API_KEY') or provider_config.get('apikey') if apikey: credentials['auth_type'] = 'api_key' credentials['apikey'] = apikey credentials['auth_provider'] = 'cloud' credentials['auth_url'] = 'https://iam.cloud.ibm.com/identity/token' logging.info("Using apikey for Cloud authentication with auth_provider=cloud") else: raise ValueError("Cloud deployment requires 'apikey'") # Get model_id from provider_config model_id = provider_config.get('model_id') or provider_config.get('MODEL_NAME') or provider_config.get('MODEL_ID') # Create integrated system with type "generative_ai_evaluator" parameters = { "evaluator_type": provider_type, "model_id": model_id } # Add space_id or project_id to integrated system parameters # For watsonx: get from provider_config # For OpenAI: space_id is NOT required in integrated system if llm_provider_type == 'watsonx' and provider_type != 'openai': # Get space_id from provider_config # Support both uppercase and lowercase keys space_id = provider_config.get('SPACE_ID') or provider_config.get('space_id') project_id = provider_config.get('PROJECT_ID') or provider_config.get('project_id') if space_id: parameters['space_id'] = space_id logging.info(f"[DEBUG] Added space_id to integrated system parameters: {space_id}") elif project_id: parameters['project_id'] = project_id logging.info(f"[DEBUG] Added project_id to integrated system parameters: {project_id}") else: logging.warning("[DEBUG] No space_id/project_id found in llm_provider_config!") payload = { "name": provider_name, "description": f"LLM evaluator system for {llm_provider_type}", "type": "generative_ai_evaluator", "parameters": parameters, "credentials": credentials } system = self._ai_client.integrated_systems.add(**payload).result self.results["llm_provider_id"] = system.metadata.id self.results["llm_provider_name"] = system.entity.name return system.metadata.id
def _invoke_prompt_setup(self, config, custom_monitor_id, llm_provider_id): """ Invoke the Prompt Setup API to create subscription and monitor instance. This matches the REST API flow: POST /v2/prompt_setup followed by polling GET /v2/prompt_setup. Note: For WatsonX, provide either PROJECT_ID or SPACE_ID (not both). - Use PROJECT_ID for project-based deployments - Use SPACE_ID + DEPLOYMENT_ID for space-based deployments :param dict config: Configuration dictionary :param str custom_monitor_id: Custom monitor definition ID :param str llm_provider_id: LLM evaluator integrated system ID :return: Prompt setup response with subscription_id and service_provider_id :rtype: dict """ # Validate that either project_id or space_id is provided # Support both uppercase and lowercase keys project_id = config.get('PROJECT_ID') or config.get('project_id') space_id = config.get('SPACE_ID') or config.get('space_id') deployment_id = config.get('DEPLOYMENT_ID') or config.get('deployment_id') if not project_id and not space_id: raise ValueError("Either PROJECT_ID or SPACE_ID must be provided in the configuration") # If space_id is provided, deployment_id is mandatory if space_id and not deployment_id: raise ValueError("DEPLOYMENT_ID is mandatory when SPACE_ID is provided") # Fetch the monitor definition to get actual metric IDs monitor_def = self._ai_client.monitor_definitions.get(monitor_definition_id=custom_monitor_id).result # Build a mapping from metric names to metric IDs metric_name_to_id = {} if hasattr(monitor_def.entity, 'metrics') and monitor_def.entity.metrics: for metric in monitor_def.entity.metrics: if hasattr(metric, 'name') and hasattr(metric, 'id'): metric_name_to_id[metric.name] = metric.id # Build monitors payload - read metrics from MONITOR_METRICS and use actual metric IDs metrics = [] for metric_def in config.get('MONITOR_METRICS', []): metric_name = metric_def.get('name') # Use the actual metric ID from the monitor definition instead of the name metric_id = metric_name_to_id.get(metric_name, metric_name) metric = { 'metric_id': metric_id, 'dataset_type': metric_def.get('dataset_type', 'feedback') } # Add grader prompt mappings if provided if 'grader_prompt_variables_mapping' in metric_def: metric['grader_prompt_variables_mapping'] = metric_def['grader_prompt_variables_mapping'] # Add LLM provider ID (metric-specific or default) if 'llm_provider_id' in metric_def: metric['llm_provider_id'] = metric_def['llm_provider_id'] # Add thresholds if provided in metric definition if 'thresholds' in metric_def: threshold_cfg = metric_def['thresholds'] thresholds = [] for threshold_type, value in threshold_cfg.items(): if threshold_type.lower() == 'lower_limit': thresholds.append({ 'type': 'lower_limit', 'default': value }) elif threshold_type.lower() == 'upper_limit': thresholds.append({ 'type': 'upper_limit', 'default': value }) if thresholds: metric['thresholds'] = thresholds metrics.append(metric) # Build parameters # For LLMaaJ, llm_provider_id should only be in metrics_configuration parameters = { 'min_records': config.get('MIN_RECORDS', 10), 'max_records': config.get('MAX_RECORDS', 100), 'metrics_configuration': { 'dataset_type': 'feedback', 'llm_provider_id': llm_provider_id, 'metrics': metrics } } # Add space_id or project_id to parameters so monitor runs can access the LLM # Support both uppercase and lowercase keys space_id_param = config.get('SPACE_ID') or config.get('space_id') if space_id_param: parameters['space_id'] = space_id_param project_id_param = config.get('PROJECT_ID') or config.get('project_id') if project_id_param: parameters['project_id'] = project_id_param # Build thresholds from MONITOR_METRICS thresholds = [] for metric_def in config.get('MONITOR_METRICS', []): metric_name = metric_def.get('name') metric_id = metric_name_to_id.get(metric_name, metric_name) if 'thresholds' in metric_def: threshold_cfg = metric_def['thresholds'] for threshold_type, value in threshold_cfg.items(): threshold_entry = { 'metric_id': metric_id, 'type': threshold_type.lower(), 'value': value } thresholds.append(threshold_entry) # Build monitors dictionary monitors = { custom_monitor_id: { 'parameters': parameters, 'thresholds': thresholds } } # Build query parameters for polling query_params = { 'prompt_template_asset_id': config.get('PROMPT_TEMPLATE_ASSET_ID') } # Support both uppercase and lowercase keys project_id_query = config.get('PROJECT_ID') or config.get('project_id') if project_id_query: query_params['project_id'] = project_id_query space_id_query = config.get('SPACE_ID') or config.get('space_id') if space_id_query: query_params['space_id'] = space_id_query deployment_id_query = config.get('DEPLOYMENT_ID') or config.get('deployment_id') if deployment_id_query: query_params['deployment_id'] = deployment_id_query # Call prompt setup API (this will be async, need to poll) mrm = ModelRiskManagement(self._ai_client) # Build parameters dict dynamically (only include non-None values) prompt_setup_params = { 'prompt_template_asset_id': config.get('PROMPT_TEMPLATE_ASSET_ID'), 'label_column': config.get('LABEL_COLUMN'), 'operational_space_id': config.get('OPERATIONAL_SPACE_ID'), 'problem_type': config.get('PROBLEM_TYPE'), 'input_data_type': config.get('INPUT_DATA_TYPE'), 'supporting_monitors': monitors } # Add optional parameters only if provided # Support both uppercase and lowercase keys project_id_setup = config.get('PROJECT_ID') or config.get('project_id') if project_id_setup: prompt_setup_params['project_id'] = project_id_setup space_id_setup = config.get('SPACE_ID') or config.get('space_id') if space_id_setup: prompt_setup_params['space_id'] = space_id_setup deployment_id_setup = config.get('DEPLOYMENT_ID') or config.get('deployment_id') if deployment_id_setup: prompt_setup_params['deployment_id'] = deployment_id_setup # Support both uppercase and lowercase keys for RAG parameters if config.get('CONTEXT_FIELDS') or config.get('context_fields'): prompt_setup_params['context_fields'] = config.get('CONTEXT_FIELDS') or config.get('context_fields') if config.get('QUESTION_FIELD') or config.get('question_field'): prompt_setup_params['question_field'] = config.get('QUESTION_FIELD') or config.get('question_field') if config.get('DATA_INPUT_LOCALE'): prompt_setup_params['data_input_locale'] = config['DATA_INPUT_LOCALE'] if config.get('GENERATED_OUTPUT_LOCALE'): prompt_setup_params['generated_output_locale'] = config['GENERATED_OUTPUT_LOCALE'] # Use the new wos.execute_prompt_setup() method instead of deprecated mrm.execute_prompt_setup() from ibm_watson_openscale.wos import WOS wos = WOS(self._ai_client, self.service_url) response = wos.execute_prompt_setup(**prompt_setup_params) return self._poll_prompt_setup(query_params) def _poll_prompt_setup(self, query_params, max_attempts=30, poll_interval=2): """ Poll the prompt setup API until completion. :param dict query_params: Query parameters for the prompt setup API :param int max_attempts: Maximum number of polling attempts :param int poll_interval: Interval between polls in seconds :return: Prompt setup response with subscription_id and service_provider_id :rtype: dict """ import time for attempt in range(max_attempts): try: # Use the new wos.get_prompt_setup() method instead of deprecated mrm.get_prompt_setup() from ibm_watson_openscale.wos import WOS wos = WOS(self._ai_client, self.service_url) response = wos.get_prompt_setup(**query_params) result = response.result.to_dict() if hasattr(response, 'result') else response if isinstance(result, dict): status = result.get('status', {}) state = status.get('state') if state == 'FINISHED': # Extract subscription_id and service_provider_id subscription_id = result.get('subscription_id') service_provider_id = result.get('service_provider_id') return { 'subscription_id': subscription_id, 'service_provider_id': service_provider_id, 'status': status, 'full_response': result } elif state in ['FAILED', 'ERROR']: # Extract error details from the response error_msg = status.get('message', 'No error message provided') failure_info = status.get('failure', {}) # Extract error code and message error_code = None error_details_msg = None if isinstance(failure_info, dict): error_code = failure_info.get('code') error_details_msg = failure_info.get('message') errors = failure_info.get('errors', []) if errors and isinstance(errors, list) and len(errors) > 0: first_error = errors[0] if isinstance(first_error, dict): error_code = first_error.get('code', error_code) error_details_msg = first_error.get('message', error_details_msg) # Build concise error message error_parts = [f"Prompt setup failed: {state}"] if error_code: error_parts.append(f"Code: {error_code}") if error_details_msg: error_parts.append(f"Message: {error_details_msg}") elif error_msg: error_parts.append(f"Message: {error_msg}") error_summary = " | ".join(error_parts) logging.error(error_summary) raise Exception(error_summary) logging.info(f"Prompt setup in progress, state: {state}, attempt {attempt + 1}/{max_attempts}") time.sleep(poll_interval) except Exception as e: if attempt == max_attempts - 1: raise Exception(f"Failed to poll prompt setup after {max_attempts} attempts: {str(e)}") time.sleep(poll_interval) raise Exception(f"Prompt setup did not complete after {max_attempts} attempts")