# coding: utf-8
# Copyright 2025 IBM All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Optional
from ibm_cloud_sdk_core import BaseService
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import (
ApplicabilitySelection, IntegratedSystems, MetricThreshold,
MonitorInstanceSchedule, MonitorMetricRequest, MonitorRuntime,
ScheduleStartTime, Target, MonitorTagRequest)
from ibm_watson_openscale.supporting_classes.enums import (
MetricThresholdTypes, TargetTypes)
from ibm_watson_openscale.utils.utils import validate_type
from ibm_watson_openscale.mrm import ModelRiskManagement
[docs]
class SetupCustomMonitor:
def __init__(self, ai_client: BaseService, service_url: str) -> None:
validate_type(ai_client, 'ai_client', BaseService, True)
self._ai_client = ai_client
self.service_url = service_url
self.results = {}
self.config = {}
logging.info("SetupCustomMonitor instance initialized.")
[docs]
def setup_configuration(self, config, function_code):
try:
from ibm_watsonx_ai import APIClient
except Exception as e:
logging.error(f"ibm_watsonx_ai package is not available, Please install the latest version ")
raise e
logging.basicConfig(level=logging.INFO)
CLOUD_API_KEY = config.get('CLOUD_API_KEY')
is_cpd = "CPD_INFO" in config
is_aws = config.get("AWS_AUTH_ENABLED", False)
if is_cpd:
cpd_info = config.get("CPD_INFO")
if not cpd_info:
raise ValueError("Missing 'CPD_INFO' in configuration.")
cpd_url = cpd_info.get("CPD_URL") if cpd_info.get("CPD_URL") else config.get("OPENSCALE_API_URL")
cpd_password = cpd_info.get("API_KEY") if cpd_info.get("API_KEY") else config.get("CLOUD_API_KEY")
cpd_username = cpd_info.get("USERNAME")
cpd_version = cpd_info.get("VERSION")
fields = {
"CPD_URL": cpd_url,
"USERNAME": cpd_username,
"API_KEY": cpd_password,
"VERSION": cpd_version
}
missing_fields = [name for name, value in fields.items() if not value]
if missing_fields:
raise ValueError(f"Missing required CPD configuration fields: {', '.join(missing_fields)}")
from urllib.parse import urlparse
if not urlparse(cpd_url).scheme:
raise ValueError("Invalid CPD_URL: Must be a valid URL.")
cpd_config = {
"token_info": {
"url": "{}/icp4d-api/v1/authorize".format(cpd_url),
"headers": {
"Content-Type": "application/json",
"Accept": "application/json"
},
"payload": {
"username": cpd_username,
"api_key": cpd_password,
},
"method": "post"
}
}
defaults = {
"DEPLOYMENT_NAME": "Custom Metrics Provider Deployment",
"PYTHON_FUNCTION_NAME": "Custom Metrics Provider Function",
"WML_URL": "https://us-south.ml.cloud.ibm.com",
"IAM_URL": "https://iam.cloud.ibm.com/oidc/token",
"CUSTOM_METRICS_PROVIDER_NAME": "Custom Metrics Provider",
"CUSTOM_MONITOR_NAME": "Sample Model Performance",
"OPENSCALE_API_URL": "https://api.aiopenscale.cloud.ibm.com",
"DATAMART_ID": "00000000-0000-0000-0000-000000000000",
"CUSTOM_METRICS_WAIT_TIME": 60,
"DEPLOYMENT_TYPE": "wml_online",
"RUNTIME_ENV": "runtime-25.1-py3.12",
"ENABLE_SCHEDULE": False,
"SCHEDULE" : {
"repeat_interval": 1,
"repeat_type": "hour",
"delay_unit": "minute",
"delay_time": 5
},
"DELETE_CUSTOM_MONITOR": True,
"DELETE_INTEGRATED_SYSTEM": True,
"DELETE_CUSTOM_MONITOR_INSTANCE": True,
"INPUT_DATA_TYPES": ["structured","unstructured_text","unstructured_image","chat"],
"ALGORITHM_TYPES": ["binary", "regression", "multiclass", "question_answering", "summarization",
"retrieval_augmented_generation", "classification", "generation", "extraction"],
"TAGS": [
{
"name": "region",
"TAG_DESCRIPTION": "customer geographical region"
}
],
"CUSTOM_METRICS_PROVIDER_CREDENTIALS" : {
"auth_type":"bearer"
},
"token_info": {
"url": "https://iam.cloud.ibm.com/identity/token",
"headers": { "Content-type": "application/x-www-form-urlencoded" },
"payload": "grant_type=urn:ibm:params:oauth:grant-type:apikey&response_type=cloud_iam&apikey="+ CLOUD_API_KEY,
"method": "POST"
}
}
config = {**defaults, **config}
if is_cpd:
creds = config.setdefault("CUSTOM_METRICS_PROVIDER_CREDENTIALS", {})
token_info = creds.setdefault("token_info", cpd_config.get("token_info", {}))
config["WML_URL"] = config["OPENSCALE_API_URL"]
token_info.setdefault("url", f"{config['OPENSCALE_API_URL']}/icp4d-api/v1/authorize")
payload = token_info.setdefault("payload", {})
payload.setdefault("api_key", config.get("CLOUD_API_KEY"))
elif is_aws:
# Configure AWS/MCSP token generation for custom metrics provider
# The MCSP token endpoint expects a JSON payload and returns {"token": "..."}
# token_field_name tells the OpenScale backend which field in the response contains the token
aws_iam_url = config.get("AWS_IAM_URL", "https://account-iam.platform.saas.ibm.com/api/2.0/apikeys/token")
creds = config.setdefault("CUSTOM_METRICS_PROVIDER_CREDENTIALS", {})
creds["auth_type"] = "bearer"
creds["token_info"] = {
"url": aws_iam_url,
"headers": {"Content-type": "application/json"},
"payload": {"apikey": config.get("CLOUD_API_KEY")},
"method": "POST",
"token_field_name": "token"
}
# Override WML URL with AWS-specific URL if provided
if config.get("AWS_WML_URL"):
config["WML_URL"] = config["AWS_WML_URL"]
if config.get("AWS_OPENSCALE_API_URL"):
config["OPENSCALE_API_URL"] = config["AWS_OPENSCALE_API_URL"]
else:
if "token_info" not in config.get("CUSTOM_METRICS_PROVIDER_CREDENTIALS", {}):
config["CUSTOM_METRICS_PROVIDER_CREDENTIALS"]["token_info"] = config["token_info"]
if config.get("DEPLOYMENT_NAME") and config.get("SUBSCRIPTION_ID"):
config["DEPLOYMENT_NAME_PREFIX"] = str(config.get("DEPLOYMENT_NAME")) + "_" + str(config.get("SUBSCRIPTION_ID"))
config["PYTHON_FUNCTION_NAME"] = str(config.get("PYTHON_FUNCTION_NAME")) + "_" + str(config.get("SUBSCRIPTION_ID"))
else:
config["DEPLOYMENT_NAME_PREFIX"] = config.get("DEPLOYMENT_NAME")
try:
self.config = config
if 'SPACE_ID' not in config or not config['SPACE_ID']:
raise ValueError("'SPACE_ID' is missing in the configuration")
print("Initialising Watson Machine Learning (WML) client.")
if is_cpd:
print("Initilising CPD WML")
credentials = {
"url": cpd_url,
"username": cpd_username,
"apikey": cpd_password,
"instance_id": "openshift",
"version": cpd_version
}
wml_client = APIClient(credentials)
elif is_aws:
print("Initilising AWS WML")
from ibm_watsonx_ai import Credentials as WatsonxCredentials
aws_wml_url = config.get("WML_URL")
aws_platform_url = config.get("AWS_PLATFORM_URL", config.get("AWS_OPENSCALE_API_URL"))
wml_credentials = WatsonxCredentials(
url=aws_wml_url,
api_key=config["CLOUD_API_KEY"],
platform_url=aws_platform_url
)
wml_client = APIClient(credentials=wml_credentials)
else:
print("Initilising Cloud WML")
wml_client = APIClient({
"url": config["WML_URL"],
"apikey": config["CLOUD_API_KEY"]
})
wml_client.set.default_space(config['SPACE_ID'])
print(f"Default space set to {config['SPACE_ID']}")
print("Setting up an Integration System for Custom Metrics Provider")
integrated_system_id = self._setup_integrated_system(config, wml_client,function_code)
print(integrated_system_id)
# create_custom_monitor
print("Creating custom monitor.")
custom_monitor_id = self._create_custom_monitor(config)
result = self._associate_integrated_system(integrated_system_id, custom_monitor_id)
# _create_monitor_instance
print("Creating monitor instance.")
monitor_instance_id = self._create_monitor_instance(config, custom_monitor_id, integrated_system_id)
print(monitor_instance_id)
# Create custom dataset for record-level metrics
print("Creating custom dataset for record-level metrics.")
custom_dataset_info = self.create_custom_dataset(
data_mart_id=config['DATAMART_ID'],
subscription_id=config['SUBSCRIPTION_ID'],
custom_monitor_id=custom_monitor_id
)
print(f"Custom dataset created with ID: {custom_dataset_info['custom_dataset_id']}")
logging.info("Custom monitor setup completed successfully")
return self.results
except Exception as e:
logging.error(f"Setup failed: {str(e)}")
raise e
def _cleanup_existing_deployment(self, wml_client, deployment_name):
deployment_type = self.config.get("DEPLOYMENT_TYPE","wml_online").lower()
if deployment_type == "wml_online":
try:
print(f"Performing wml_online deployment Cleanup for: {deployment_name}")
deployments = wml_client.deployments.get_details()
for deployment in deployments["resources"]:
if deployment["metadata"]["name"] == deployment_name:
deployment_id = deployment["metadata"]["id"]
print(f"Deleting wml_online deployment: {deployment_id} ")
wml_client.deployments.delete(deployment_id)
asset_id = deployment["entity"].get("asset", {}).get("id")
if asset_id:
print(f"Deleting associated asset: {asset_id}")
wml_client.repository.delete(asset_id)
except Exception as e:
print(f"Error during wml_online deployment clenaup {deployment_name} : {e}")
elif deployment_type == "wml_batch":
try:
print(f"Performing Batch deployment Cleanup for: {deployment_name}")
deployments = wml_client.deployments.get_details()
for deployment in deployments["resources"]:
if deployment["metadata"]["name"] == deployment_name or deployment["metadata"]["name"] == deployment_name + '_WRAPPER':
deployment_id = deployment["metadata"]["id"]
print(f"Deleting Batch deployment: {deployment_id} ")
wml_client.deployments.delete(deployment_id)
asset_id = deployment["entity"].get("asset", {}).get("id")
if asset_id:
print(f"Deleting associated asset: {asset_id}")
wml_client.repository.delete(asset_id)
except Exception as e:
print(f"Error during Batch deployment clenaup {deployment_name} : {e}")
def _create_function(self, wml_client, config, code):
deployment_type = self.config.get("DEPLOYMENT_TYPE","wml_online").lower()
software_spec_id = wml_client.software_specifications.get_id_by_name(
config['RUNTIME_ENV'])
try:
function_props = {
wml_client.repository.FunctionMetaNames.NAME: config['PYTHON_FUNCTION_NAME'],
wml_client.repository.FunctionMetaNames.SOFTWARE_SPEC_ID: software_spec_id
}
function_artifact = wml_client.repository.store_function(
meta_props=function_props, function=code)
function_id = wml_client.repository.get_function_id(function_artifact)
self.results["function_id"] = function_id
return function_id
except Exception as e:
print(f"Error during create function of deployment type {deployment_type} : {e}")
def _deploy_function(self, wml_client, config, function_id):
deployment_type = self.config.get("DEPLOYMENT_TYPE","wml_online").lower()
try:
self.hardware_spec_id = wml_client.hardware_specifications.get_id_by_name('M')
if deployment_type == "wml_online":
print(f"Deploy function as ONLINE : {config['DEPLOYMENT_NAME']}")
deploy_meta = {
wml_client.deployments.ConfigurationMetaNames.NAME: config["DEPLOYMENT_NAME_PREFIX"],
wml_client.deployments.ConfigurationMetaNames.ONLINE: {},
wml_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {"id": self.hardware_spec_id}
}
elif deployment_type == "wml_batch":
print(f"Deploy function as BATCH : {config['DEPLOYMENT_NAME']}")
deploy_meta = {
wml_client.deployments.ConfigurationMetaNames.NAME: config["DEPLOYMENT_NAME_PREFIX"],
wml_client.deployments.ConfigurationMetaNames.BATCH: {},
wml_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: { "name": "S", "num_nodes": 1}
}
deployment = wml_client.deployments.create(function_id, meta_props=deploy_meta)
deployment_id = wml_client.deployments.get_uid(deployment)
deployment_details = wml_client.deployments.get_details(deployment_id)
created_at = deployment_details['metadata']['created_at']
current_date = created_at.split("T")[0] if "T" in created_at else ""
if deployment_type == "wml_online":
scoring_url = wml_client.deployments.get_scoring_href(deployment)
scoring_url += f"?version={current_date}"
elif deployment_type == "wml_batch":
scoring_url = config['WML_URL'] + '/ml/v4/deployment_jobs?version='+current_date
self.results.update({"deployment_id": deployment_id, "scoring_url": scoring_url})
return deployment_id, scoring_url
except Exception as e:
print(f"Error during Deploy function of deployment type {deployment_type} : {e}")
def _setup_integrated_system(self, config, wml_client, function_code):
# Create a unique name by appending subscription ID to the custom metrics provider name
unique_provider_name = f"{config['CUSTOM_METRICS_PROVIDER_NAME']}_{config['SUBSCRIPTION_ID']}"
print("CUSTOM_METRICS_PROVIDER_NAME:", unique_provider_name)
scoring_url = None
# Cleanup existing systems
systems = IntegratedSystems(self._ai_client).list().result.integrated_systems
for system in systems:
if system.entity.name == unique_provider_name:
if config['DELETE_INTEGRATED_SYSTEM']:
print("delete_integrated_system is True")
print(f"Cleaning up existing deployment {config['DEPLOYMENT_NAME']}.")
self._cleanup_existing_deployment(
wml_client, config["DEPLOYMENT_NAME_PREFIX"])
# Function Creation
print("Creating custom function.")
function_id = self._create_function(
wml_client, config, function_code)
# Deployment
self.deployment_id, scoring_url = self._deploy_function(
wml_client, config, function_id)
IntegratedSystems(self._ai_client).delete(
integrated_system_id=system.metadata.id)
else:
print("delete_integrated_system is False")
self.results["integrated_system_id"] = system.metadata.id
self.results["custom_metrics_provider_name"] = system.entity.name
return system.metadata.id
if scoring_url is None: # if you delete it from UI
# Function Creation
print("Creating custom function.")
function_id = self._create_function(
wml_client, config, function_code)
# Deployment
self.deployment_id, scoring_url = self._deploy_function(
wml_client, config, function_id)
# Create new integrated system with unique name
system = self._ai_client.integrated_systems.add(
name=unique_provider_name,
description="Custom metrics provider system",
type="custom_metrics_provider",
credentials=config['CUSTOM_METRICS_PROVIDER_CREDENTIALS'],
connection={
"display_name": unique_provider_name,
"endpoint": scoring_url
}
).result
self.results["integrated_system_id"] = system.metadata.id
self.results["custom_metrics_provider_name"] = system.entity.name
return system.metadata.id
def _create_custom_monitor(self, config):
is_schedule = "SCHEDULE" in config
if is_schedule:
schedule_info = config["SCHEDULE"]
REPEAT_INTERVAL = schedule_info.get("REPEAT_INTERVAL") or schedule_info.get("repeat_interval")
REPEAT_TYPE = schedule_info.get("REPEAT_TYPE") or schedule_info.get("repeat_type")
delay_unit = schedule_info.get("DELAY_UNIT") or schedule_info.get("delay_unit")
delay_time = schedule_info.get("DELAY_TIME") or schedule_info.get("delay_time")
# Step 1: Check if monitor already exists
existing_monitors = self._ai_client.monitor_definitions.list().result.monitor_definitions
for monitor in existing_monitors:
if monitor.entity.name == config['CUSTOM_MONITOR_NAME']:
if config.get('DELETE_CUSTOM_MONITOR', False):
print(f"Deleting existing monitor: {monitor.entity.name}")
# Delete associated custom dataset first
self.delete_custom_dataset(monitor.metadata.id, config['SUBSCRIPTION_ID'])
# Then delete the monitor
self._ai_client.monitor_definitions.delete(monitor.metadata.id,background_mode=False)
else:
print(f"Reusing existing monitor: {monitor.entity.name}")
self.results["custom_monitor_id"] = monitor.metadata.id
return monitor.metadata.id
# Step 2: Set applicability
problem_type_selection = ApplicabilitySelection(problem_type=config['ALGORITHM_TYPES'])
input_data_type_selection = ApplicabilitySelection(input_data_type=config['INPUT_DATA_TYPES'])
# Step 3: Dynamically build metric definitions
metrics = []
for metric_cfg in config.get('MONITOR_METRICS', []):
metric_name = metric_cfg.get('name')
thresholds = []
threshold_cfg = metric_cfg.get('thresholds', {})
for threshold_type, value in threshold_cfg.items():
if threshold_type.lower() == 'lower_limit':
thresholds.append(MetricThreshold(
type=MetricThresholdTypes.LOWER_LIMIT,
default=value
))
elif threshold_type.lower() == 'upper_limit':
thresholds.append(MetricThreshold(
type=MetricThresholdTypes.UPPER_LIMIT,
default=value
))
else:
print(f"Warning: Unknown threshold type '{threshold_type}' for metric '{metric_name}'")
metrics.append(MonitorMetricRequest(
name=metric_name,
applies_to=problem_type_selection,
thresholds=thresholds
))
# Step 4: Dynamically build tag definitions
tags = []
tag_configurations = config.get('TAGS', [])
for tag in tag_configurations:
name = tag.get('name')
description = tag.get('TAG_DESCRIPTION')
# Validate if both name and description exist
if name and description:
tags.append(MonitorTagRequest(name=name, description=description))
else:
print(f"Invalid tag configuration: {tag}")
# Step 5: Optional scheduling
if config.get('ENABLE_SCHEDULE', False):
schedule = MonitorInstanceSchedule(
repeat_interval=REPEAT_INTERVAL,
repeat_unit=REPEAT_TYPE,
start_time=ScheduleStartTime(
type="relative", delay_unit=delay_unit, delay=delay_time)
)
monitor_runtime = MonitorRuntime(type="custom_metrics_provider")
else:
schedule = None
monitor_runtime = None
# Step 6: Create the monitor
monitor_def = self._ai_client.monitor_definitions.add(
name=config['CUSTOM_MONITOR_NAME'],
metrics=metrics,
tags=tags,
schedule=schedule,
applies_to=input_data_type_selection,
monitor_runtime=monitor_runtime,
background_mode=False
).result
# Step 7: Save and return monitor ID
monitor_id = monitor_def.metadata.id
self.results["custom_monitor_id"] = monitor_id
print(f"Custom monitor created with ID: {monitor_id}")
return monitor_id
def _create_monitor_instance(self,config, custom_monitor_id, integrated_system_id):
deployment_type = self.config.get("DEPLOYMENT_TYPE","wml_online").lower()
existing_instances = self._ai_client.monitor_instances.list().result.monitor_instances
monitor_instance_id = None
try:
for instance in existing_instances:
if instance.entity.monitor_definition_id == custom_monitor_id:
monitor_instance_id = instance.metadata.id
if config.get("DELETE_CUSTOM_MONITOR_INSTANCE", False):
print(f"Deleting existing monitor instance: {monitor_instance_id}")
self._ai_client.monitor_instances.delete(monitor_instance_id,background_mode=False)
monitor_instance_id = None
break # Proceed to create a new one
else:
print(f"Updating existing monitor instance: {monitor_instance_id}")
if deployment_type == "wml_online":
patch_payload = [
{
"op": "replace",
"path": "/parameters",
"value": {
"custom_metrics_provider_id": integrated_system_id,
"custom_metrics_wait_time": config["CUSTOM_METRICS_WAIT_TIME"],
"enable_custom_metric_runs": True
}
}
]
elif deployment_type == "wml_batch":
custom_metrics_wait_time = config.get("custom_metrics_wait_time", 120)
patch_payload = [
{
"op": "replace",
"path": "/parameters",
"value": {
"custom_metrics_provider_id": integrated_system_id,
"custom_metrics_provider_type": "wml_batch",
"custom_metrics_wait_time": config["CUSTOM_METRICS_WAIT_TIME"],
"space_id": config['SPACE_ID'],
"deployment_id": self.deployment_id,
"hardware_spec_id": self.hardware_spec_id,
"enable_custom_metric_runs": True
}
}
]
self._ai_client.monitor_instances.update(
monitor_instance_id,
patch_payload,
update_metadata_only=True
)
self.results["custom_monitor_instance_id"] = monitor_instance_id
return monitor_instance_id
# No instance found or it was deleted — create a new one
print(f"Creating new monitor instance for monitor definition: {custom_monitor_id}")
target = Target(
target_type=TargetTypes.SUBSCRIPTION,
target_id=config["SUBSCRIPTION_ID"]
)
if deployment_type == "wml_online":
parameters = {
"custom_metrics_provider_id": integrated_system_id,
"custom_metrics_wait_time": config["CUSTOM_METRICS_WAIT_TIME"],
"enable_custom_metric_runs": True
}
elif deployment_type == "wml_batch":
parameters = {
"custom_metrics_provider_id": integrated_system_id,
"custom_metrics_provider_type": "wml_batch",
"custom_metrics_wait_time": config["CUSTOM_METRICS_WAIT_TIME"],
"space_id": config['SPACE_ID'],
"deployment_id": self.deployment_id,
"hardware_spec_id": self.hardware_spec_id,
"enable_custom_metric_runs": True
}
instance = self._ai_client.monitor_instances.create(
data_mart_id=config["DATAMART_ID"],
background_mode=False,
monitor_definition_id=custom_monitor_id,
target=target,
parameters=parameters
).result
monitor_instance_id = instance.metadata.id
print(f"Monitor instance created: {monitor_instance_id}")
self.results["custom_monitor_instance_id"] = monitor_instance_id
return monitor_instance_id
except Exception as e:
print(f"Error creating monitor instance : {e}")
def _associate_integrated_system(self, integrated_system_id, custom_monitor_id):
response = self._ai_client.integrated_systems.update(integrated_system_id, [{
"op": "add",
"path": "/parameters",
"value": {"monitor_definition_ids": [custom_monitor_id]}
}])
result = response.result
return result
[docs]
def get_results(self):
return self.results
[docs]
def get_config(self):
return self.config
[docs]
def get_monitor_instance_config(self,config):
target = config.get("SUBSCRIPTION_ID")
monitor_name = config.get("CUSTOM_MONITOR_NAME","Sample Model Performance")
existing_monitors = self._ai_client.monitor_definitions.list().result.monitor_definitions
for monitor in existing_monitors:
if monitor.entity.name == monitor_name:
monitor_definition_id = monitor.metadata.id
print(f"Monitor instance details picking for Subscription: {target} ,monitor definition: {monitor_definition_id} ")
result = self._ai_client.monitor_instances.list(target_target_id=target,
monitor_definition_id=monitor_definition_id).result.to_dict()
return result
[docs]
def get_custom_monitor_configuration(self,config):
result = {}
monitor_instance = self.get_results()
if monitor_instance:
result = monitor_instance
else:
res = self.get_monitor_instance_config(config=config)
monitor_instances = res.get("monitor_instances") if res else None
if monitor_instances and isinstance(monitor_instances, list) and monitor_instances and isinstance(monitor_instances[0], dict):
instance = monitor_instances[0]
metadata = instance.get("metadata", {})
entity = instance.get("entity", {})
parameters = entity.get("parameters", {})
result["custom_monitor_instance_id"] = metadata.get("id")
result["custom_monitor_id"] = entity.get("monitor_definition_id")
# Get custom_metrics_provider_id (used for both custom monitors and LLMaaJ)
custom_metrics_provider_id = parameters.get("custom_metrics_provider_id")
result["integrated_system_id"] = custom_metrics_provider_id
# For LLMaaJ, also set llm_provider_id for consistency
# Check if this is an LLMaaJ setup by looking for metrics_configuration
metrics_config = parameters.get("metrics_configuration", {})
if metrics_config and metrics_config.get("llm_provider_id"):
result["llm_provider_id"] = metrics_config.get("llm_provider_id")
elif custom_metrics_provider_id:
# If metrics_configuration exists but no llm_provider_id, use custom_metrics_provider_id
result["llm_provider_id"] = custom_metrics_provider_id
result["deployment_id"] = parameters.get("deployment_id")
result["hardware_spec_id"] = parameters.get("hardware_spec_id")
result["space_id"] = parameters.get("space_id")
result["project_id"] = parameters.get("project_id")
result["custom_metrics_provider_type"] = parameters.get("custom_metrics_provider_type")
result["custom_metrics_wait_time"] = parameters.get("custom_metrics_wait_time")
result["custom_metrics_provider_name"] = parameters.get("custom_metrics_provider_name")
# Add LLMaaJ-specific fields if present
if metrics_config:
result["metrics_configuration"] = metrics_config
result["min_records"] = parameters.get("min_records")
result["max_records"] = parameters.get("max_records")
# Add RAG-specific fields if present (for RAG task type)
if parameters.get("context_fields"):
result["context_fields"] = parameters.get("context_fields")
if parameters.get("question_field"):
result["question_field"] = parameters.get("question_field")
if parameters.get("meta_fields"):
result["meta_fields"] = parameters.get("meta_fields")
if parameters.get("data_properties"):
result["data_properties"] = parameters.get("data_properties")
# Fetch custom_dataset_id by querying data_sets API
custom_monitor_id = result.get("custom_monitor_id")
subscription_id = config.get("SUBSCRIPTION_ID")
if custom_monitor_id and subscription_id:
try:
# Search for custom dataset managed by this monitor
data_sets = self._ai_client.data_sets.list(
target_target_id=subscription_id,
type='custom'
).result.data_sets
table_name_suffix = f"{custom_monitor_id}_{subscription_id}"
for dataset in data_sets:
if hasattr(dataset.entity, 'location') and hasattr(dataset.entity.location, 'table_name'):
if dataset.entity.location.table_name.endswith(table_name_suffix):
result["custom_dataset_id"] = dataset.metadata.id
result["custom_dataset_table_name"] = dataset.entity.location.table_name
break
except Exception as e:
print(f"Warning: Could not fetch custom dataset info: {str(e)}")
else:
print("Monitor instance config is missing or improperly structured.")
return result
[docs]
def create_custom_dataset(self, data_mart_id: str, subscription_id: str, custom_monitor_id: str) -> dict:
"""
Create a custom dataset for storing record-level metrics for custom monitors.
The custom dataset includes default columns (record_id, record_timestamp, run_id, computed_on, data_set_id)
and follows the naming convention: {custom_monitor_id}_{subscription_id}
:param str data_mart_id: Watson OpenScale DataMart GUID
:param str subscription_id: ID of the subscription to be monitored
:param str custom_monitor_id: Custom monitor definition ID
:return: A dictionary containing custom dataset information including dataset_id
:rtype: dict
Example Usage:
>>> custom_dataset_info = wos_client.custom_monitor.create_custom_dataset(
data_mart_id='00000000-0000-0000-0000-000000000000',
subscription_id='subscription_id_here',
custom_monitor_id='custom_monitor_id_here'
)
"""
# Construct the dataset schema with required default columns
def get_data_set_schema():
"""
Constructs payload to create custom dataset for record-level metrics
"""
fields = [
{
"metadata": {
"modeling_role": "record-id",
"primary_key": True
},
"name": "record_id",
"nullable": False,
"type": "string"
},
{
"metadata": {
"modeling_role": "record-timestamp"
},
"name": "record_timestamp",
"nullable": False,
"type": "timestamp"
},
{
"metadata": {
"columnInfo": {
"columnIndex": "DEFAULT"
}
},
"name": "run_id",
"nullable": False,
"type": "string"
},
{
"metadata": {},
"name": "computed_on",
"nullable": False,
"type": "string"
},
{
"metadata": {},
"name": "data_set_id",
"nullable": False,
"type": "string"
}
]
data_schema = {
"fields": fields,
"type": "struct"
}
return data_schema
# Define target and location for the custom dataset
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import Target
from ibm_watson_openscale.supporting_classes.enums import TargetTypes
target = Target(
target_type=TargetTypes.SUBSCRIPTION,
target_id=subscription_id
)
# Naming convention: {custom_monitor_id}_{subscription_id}
table_name = f"{custom_monitor_id}_{subscription_id}"
location = {
"table_name": table_name
}
# Create the custom dataset
try:
print(f"Creating custom dataset with table name: {table_name}")
custom_dataset_response = self._ai_client.data_sets.add(
target=target,
name=custom_monitor_id,
description=f"{custom_monitor_id} record-level metrics table",
type='custom',
data_schema=get_data_set_schema(),
data_mart_id=data_mart_id,
managed_by=custom_monitor_id,
location=location,
background_mode=True
)
custom_dataset_id = custom_dataset_response.result.metadata.id
custom_dataset_info = {
"custom_dataset_id": custom_dataset_id,
"table_name": table_name,
"custom_monitor_id": custom_monitor_id,
"subscription_id": subscription_id,
"response": custom_dataset_response.result.to_dict()
}
print(f"Custom dataset created successfully with ID: {custom_dataset_id}")
# Store in results for future reference
self.results["custom_dataset_id"] = custom_dataset_id
self.results["custom_dataset_table_name"] = table_name
return custom_dataset_info
except Exception as e:
print(f"Error creating custom dataset: {str(e)}")
raise e
[docs]
def delete_custom_dataset(self, custom_monitor_id: Optional[str] = None, subscription_id: Optional[str] = None, custom_dataset_id: Optional[str] = None):
"""
Delete the custom dataset associated with a custom monitor.
:param str custom_monitor_id: (Optional) Custom monitor definition ID. Required if custom_dataset_id is not provided.
:param str subscription_id: (Optional) ID of the subscription. Required if custom_dataset_id is not provided.
:param str custom_dataset_id: (Optional) Custom dataset ID. If provided, directly deletes the dataset without searching.
Example usage:
>>> # Delete by searching for the dataset
>>> wos_client.custom_monitor.delete_custom_dataset(
custom_monitor_id='custom_monitor_id_here',
subscription_id='subscription_id_here'
)
>>> # Delete directly using dataset ID (custom_monitor_id and subscription_id not required)
>>> wos_client.custom_monitor.delete_custom_dataset(
custom_dataset_id='dataset_id_here'
)
"""
try:
# If custom_dataset_id is provided, directly delete the dataset
if custom_dataset_id:
print(f"Deleting custom dataset with ID: {custom_dataset_id}")
self._ai_client.data_sets.delete(data_set_id=custom_dataset_id, force=True, background_mode=False)
print(f"Custom dataset {custom_dataset_id} deleted successfully")
return
# Otherwise, search for the dataset by table name
# Validate required parameters for search-based deletion
if not custom_monitor_id or not subscription_id:
raise ValueError("custom_monitor_id and subscription_id are required when custom_dataset_id is not provided")
# Construct the expected table name based on naming convention
table_name_suffix = f"{custom_monitor_id}_{subscription_id}"
# Find and delete the dataset
data_sets = self._ai_client.data_sets.list(
target_target_id=subscription_id,
type='custom'
).result.data_sets
for dataset in data_sets:
if hasattr(dataset.entity, 'location') and hasattr(dataset.entity.location, 'table_name'):
# Use endswith to match table name suffix (handles datamart prefix)
if dataset.entity.location.table_name.endswith(table_name_suffix):
dataset_id = dataset.metadata.id
print(f"Deleting custom dataset: {dataset_id} (table: {dataset.entity.location.table_name})")
self._ai_client.data_sets.delete(data_set_id=dataset_id, force=True, background_mode=False)
print(f"Custom dataset {dataset_id} deleted successfully")
return
print(f"No custom dataset found with table name suffix: {table_name_suffix}")
except Exception as e:
print(f"Warning: Error deleting custom dataset: {str(e)}")
[docs]
def setup_llm_judge_configuration(self, config):
"""
Setup LLM-as-a-Judge (LLMaaJ) for custom monitors.
This method follows the LLMaaJ flow:
1. Create monitor definition with LLM grader metrics
2. Create integrated system (generative_ai_evaluator)
3. Invoke prompt setup API with monitors payload
4. Poll for completion to get subscription_id and service_provider_id
5. Get monitor instance ID
6. Fetch custom_dataset_id for record level metrics
:param dict config: Configuration with LLM_PROVIDER, LLM_PROVIDER_CONFIG, CUSTOM_MONITOR_CONFIG,
PROMPT_TEMPLATE_ASSET_ID, LABEL_COLUMN, OPERATIONAL_SPACE_ID, PROBLEM_TYPE,
INPUT_DATA_TYPE, DATAMART_ID, and optional parameters.
:return: Dictionary with llm_provider_id, custom_monitor_id, subscription_id,
service_provider_id, monitor_instance_id, custom_dataset_id,
custom_dataset_table_name, etc.
:rtype: dict
"""
logging.info("Starting LLMaaJ custom metrics setup")
self._validate_llmaj_config(config)
# Merge configurations
merged_config = {**config, **config.get('CUSTOM_MONITOR_CONFIG', {})}
self.config = merged_config
# Step 1: Create monitor definition with LLM grader metrics (use LLMaaJ-specific method)
custom_monitor_id = self.create_llmaj_monitor_definition(merged_config)
logging.info(f"Created custom monitor definition: {custom_monitor_id}")
# Step 2: Create integrated system (generative_ai_evaluator)
provider_config = config['LLM_PROVIDER_CONFIG']
llm_provider = provider_config.get('type', '').lower()
delete_llm_provider = merged_config.get('DELETE_LLM_PROVIDER', True)
llm_provider_id = self.create_llm_provider(
llm_provider, provider_config, delete_llm_provider
)
logging.info(f"Created LLM provider: {llm_provider_id}")
# Step 3: Invoke prompt setup API with monitors payload
prompt_setup_response = self._invoke_prompt_setup(
merged_config, custom_monitor_id, llm_provider_id
)
# Step 4: Extract subscription_id and service_provider_id from response
subscription_id = prompt_setup_response.get('subscription_id')
service_provider_id = prompt_setup_response.get('service_provider_id')
if not subscription_id:
raise ValueError("subscription_id not found in prompt_setup response")
logging.info(f"Prompt setup completed. Subscription ID: {subscription_id}, Service Provider ID: {service_provider_id}")
# Step 5: Get monitor instance ID
monitor_instance_id = self._get_monitor_instance_id(subscription_id, custom_monitor_id)
logging.info(f"Monitor instance ID: {monitor_instance_id}")
# Step 6: Fetch custom_dataset_id by querying data_sets API with retry
custom_dataset_id = None
custom_dataset_table_name = None
import time
max_retries = 5
retry_delay = 2 # seconds
for retry in range(max_retries):
try:
# Search for custom dataset managed by this monitor
data_sets = self._ai_client.data_sets.list(
target_target_id=subscription_id,
type='custom'
).result.data_sets
table_name_suffix = f"{custom_monitor_id}_{subscription_id}"
for dataset in data_sets:
if hasattr(dataset.entity, 'location') and hasattr(dataset.entity.location, 'table_name'):
if dataset.entity.location.table_name.endswith(table_name_suffix):
custom_dataset_id = dataset.metadata.id
custom_dataset_table_name = dataset.entity.location.table_name
logging.info(f"Custom dataset ID found: {custom_dataset_id}")
break
if custom_dataset_id:
break # Found the dataset, exit retry loop
if retry < max_retries - 1:
logging.info(f"Custom dataset not found yet, retrying in {retry_delay}s (attempt {retry + 1}/{max_retries})")
time.sleep(retry_delay)
else:
logging.warning(f"Custom dataset not found after {max_retries} attempts")
except Exception as e:
if retry < max_retries - 1:
logging.warning(f"Error fetching custom dataset (attempt {retry + 1}/{max_retries}): {str(e)}")
time.sleep(retry_delay)
else:
logging.warning(f"Could not fetch custom dataset info after {max_retries} attempts: {str(e)}")
# Step 7: Update monitor instance with custom_dataset_id if found
if custom_dataset_id and monitor_instance_id:
try:
logging.info(f"Updating monitor instance {monitor_instance_id} with custom_dataset_id: {custom_dataset_id}")
patch_document = [
{
"op": "add",
"path": "/parameters/custom_dataset_id",
"value": custom_dataset_id
}
]
self._ai_client.monitor_instances.update(
monitor_instance_id=monitor_instance_id,
patch_document=patch_document
)
logging.info("Monitor instance updated successfully with custom_dataset_id")
except Exception as e:
logging.warning(f"Failed to update monitor instance with custom_dataset_id: {str(e)}")
# Return results
self.results.update({
'llm_provider': llm_provider,
'llm_provider_id': llm_provider_id,
'custom_monitor_id': custom_monitor_id,
'subscription_id': subscription_id,
'service_provider_id': service_provider_id,
'monitor_instance_id': monitor_instance_id,
'custom_dataset_id': custom_dataset_id,
'custom_dataset_table_name': custom_dataset_table_name,
'prompt_setup_response': prompt_setup_response
})
logging.info("LLMaaJ custom metrics setup completed successfully")
return self.results
[docs]
def create_llmaj_monitor_definition(self, config):
"""
Create a monitor definition specifically for LLMaaJ.
This method creates a monitor definition with LLM grader metrics including problem_type.
:param dict config: Configuration dictionary
:return: Monitor definition ID
:rtype: str
"""
# Extract schedule configuration early (same pattern as _create_custom_monitor)
is_schedule = "SCHEDULE" in config
# Set default SCHEDULE if ENABLE_SCHEDULE is True but SCHEDULE is not configured
if config.get('ENABLE_SCHEDULE') is True and not is_schedule:
config["SCHEDULE"] = {
"repeat_interval": 60,
"repeat_type": "minute",
"delay_unit": "minute",
"delay_time": 5
}
is_schedule = True
logging.info("ENABLE_SCHEDULE is True but SCHEDULE not configured. Using default schedule values.")
if is_schedule:
schedule_info = config["SCHEDULE"]
REPEAT_INTERVAL = schedule_info.get("REPEAT_INTERVAL") or schedule_info.get("repeat_interval")
REPEAT_TYPE = schedule_info.get("REPEAT_TYPE") or schedule_info.get("repeat_type")
delay_unit = schedule_info.get("DELAY_UNIT") or schedule_info.get("delay_unit")
delay_time = schedule_info.get("DELAY_TIME") or schedule_info.get("delay_time")
# Check if monitor already exists
existing_monitors = self._ai_client.monitor_definitions.list().result.monitor_definitions
for monitor in existing_monitors:
if monitor.entity.name == config['CUSTOM_MONITOR_NAME']:
if config.get('DELETE_CUSTOM_MONITOR', False):
logging.info(f"Deleting existing monitor: {monitor.entity.name}")
monitor_id = monitor.metadata.id
# Before deleting the monitor, we must first delete associated monitor instances and custom datasets
try:
# Get all monitor instances for this monitor definition
monitor_instances = self._ai_client.monitor_instances.list(
monitor_definition_id=monitor_id
).result.monitor_instances
for instance in monitor_instances:
instance_id = instance.metadata.id
subscription_id = instance.entity.target.target_id
logging.info(f"Deleting monitor instance: {instance_id} for subscription: {subscription_id}")
# Delete custom dataset first
try:
self.delete_custom_dataset(monitor_id, subscription_id)
except Exception as e:
logging.warning(f"Error deleting custom dataset: {str(e)}")
# Then delete monitor instance
try:
self._ai_client.monitor_instances.delete(instance_id, background_mode=False)
logging.info(f"Monitor instance {instance_id} deleted successfully")
except Exception as e:
logging.warning(f"Error deleting monitor instance: {str(e)}")
except Exception as e:
logging.warning(f"Error cleaning up monitor instances: {str(e)}")
# Finally, delete the monitor definition with error handling
try:
self._ai_client.monitor_definitions.delete(monitor_id, background_mode=False)
logging.info(f"Monitor definition {monitor_id} deleted successfully")
except Exception as e:
# If deletion fails due to network issues, log warning but continue
# The monitor definition may have been deleted or is in process
logging.warning(f"Error deleting monitor definition (may already be deleted): {str(e)}")
else:
logging.info(f"Reusing existing monitor: {monitor.entity.name}")
self.results["custom_monitor_id"] = monitor.metadata.id
return monitor.metadata.id
# Set applicability - for LLMaaJ, use input_data_type and problem_type at monitor level only
# Supported problem types for LLMaaJ
supported_problem_types = [
'question_answering',
'summarization',
'generation',
'extraction',
'retrieval_augmented_generation',
'classification'
]
# If PROBLEM_TYPE is provided, use it; otherwise use all supported types
if 'PROBLEM_TYPE' in config:
problem_type = [config['PROBLEM_TYPE']]
else:
problem_type = supported_problem_types
logging.info(f"PROBLEM_TYPE not provided, defaulting to all supported types: {supported_problem_types}")
# Create monitor-level applicability with both input_data_type and problem_type
monitor_level_applicability = ApplicabilitySelection(
input_data_type=['unstructured_text'],
problem_type=problem_type
)
# Build metric definitions from MONITOR_METRICS
# Note: applies_to should NOT be set on individual metrics - it's monitor-level only
metrics = []
for metric_cfg in config.get('MONITOR_METRICS', []):
metric_name = metric_cfg.get('name')
metric_description = metric_cfg.get('description', '')
# Extract computation details - they can be at root level or nested in 'computation'
computation_cfg = metric_cfg.get('computation', {})
prompt = computation_cfg.get('prompt') or metric_cfg.get('prompt')
grading_options = computation_cfg.get('grading_options') or metric_cfg.get('grading_options')
# Extract and build thresholds
thresholds = []
threshold_cfg = metric_cfg.get('thresholds', {})
for threshold_type, value in threshold_cfg.items():
if threshold_type.lower() == 'lower_limit':
thresholds.append(MetricThreshold(
type=MetricThresholdTypes.LOWER_LIMIT,
default=value
))
elif threshold_type.lower() == 'upper_limit':
thresholds.append(MetricThreshold(
type=MetricThresholdTypes.UPPER_LIMIT,
default=value
))
else:
logging.warning(f"Unknown threshold type '{threshold_type}' for metric '{metric_name}'")
# Build metric request for LLMaaJ evaluations
# Do NOT include applies_to - it should only be at monitor level
metric_request = MonitorMetricRequest(
name=metric_name,
description=metric_description,
thresholds=thresholds
)
# Add computation object with llm_grader type (required for LLMaaJ)
# The computation object contains the prompt and grading_options
if prompt is not None and grading_options is not None:
# Import required classes
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import (
MetricComputationSchemaLLMGraderComputation,
GradingOption
)
# Convert grading_options dicts to GradingOption objects
grading_option_objects = []
for opt in grading_options:
if isinstance(opt, dict):
grading_option_objects.append(GradingOption(
name=opt.get('name'),
value=opt.get('value'),
description=opt.get('description')
))
else:
grading_option_objects.append(opt)
# Create MetricComputationSchemaLLMGraderComputation object
computation = MetricComputationSchemaLLMGraderComputation(
type="llm_grader",
prompt=prompt,
grading_options=grading_option_objects
)
metric_request.computation = computation
metrics.append(metric_request)
# Build tags - use default tag for LLMaaJ (required by API)
tags = [MonitorTagRequest(
name="computed_on",
description="used to set the dataset type"
)]
logging.info(f"Creating monitor definition with {len(tags)} tags and {len(metrics)} metrics")
# Handle schedule configuration (same pattern as _create_custom_monitor)
if config.get('ENABLE_SCHEDULE', False) and is_schedule:
schedule = MonitorInstanceSchedule(
repeat_interval=REPEAT_INTERVAL,
repeat_unit=REPEAT_TYPE,
start_time=ScheduleStartTime(
type="relative", delay_unit=delay_unit, delay=delay_time)
)
logging.info(f"Schedule configured: {REPEAT_INTERVAL} {REPEAT_TYPE}, delay: {delay_time} {delay_unit}")
else:
schedule = None
if config.get('ENABLE_SCHEDULE', False) and not is_schedule:
logging.warning("ENABLE_SCHEDULE is True but SCHEDULE configuration is missing. Schedule will not be set.")
# Create the monitor definition with monitor_runtime for LLMaaJ
monitor_runtime = {"type": "llm_as_a_judge"}
monitor_def = self._ai_client.monitor_definitions.add(
name=config['CUSTOM_MONITOR_NAME'],
metrics=metrics,
tags=tags,
applies_to=monitor_level_applicability,
background_mode=False,
monitor_runtime=monitor_runtime,
schedule=schedule
).result
monitor_id = monitor_def.metadata.id
self.results["custom_monitor_id"] = monitor_id
logging.info(f"LLMaaJ monitor definition created with ID: {monitor_id}")
return monitor_id
def _get_monitor_instance_id(self, subscription_id, custom_monitor_id):
"""
Get monitor instance ID for a given subscription and custom monitor.
:param str subscription_id: Subscription ID
:param str custom_monitor_id: Custom monitor definition ID
:return: Monitor instance ID or None if not found
:rtype: str or None
"""
try:
monitor_instances = self._ai_client.monitor_instances.list(
target_target_id=subscription_id,
monitor_definition_id=custom_monitor_id
).result.monitor_instances
if monitor_instances and len(monitor_instances) > 0:
return monitor_instances[0].metadata.id
return None
except Exception as e:
logging.warning(f"Could not retrieve monitor instance ID: {str(e)}")
return None
def _validate_llmaj_config(self, config):
"""
Validate the LLMaaJ configuration.
:param dict config: Configuration dictionary
:raises ValueError: If required parameters are missing or invalid
"""
# Set default INPUT_DATA_TYPE for LLMaaJ if not provided
if 'INPUT_DATA_TYPE' not in config or not config['INPUT_DATA_TYPE']:
config['INPUT_DATA_TYPE'] = 'unstructured_text'
logging.info("INPUT_DATA_TYPE not provided, defaulting to 'unstructured_text' for LLMaaJ")
# Note: SUBSCRIPTION_ID is NOT required - it's created by prompt_setup API
required_fields = [
'LLM_PROVIDER_CONFIG',
'CUSTOM_MONITOR_CONFIG',
'PROMPT_TEMPLATE_ASSET_ID',
'LABEL_COLUMN',
'OPERATIONAL_SPACE_ID',
'PROBLEM_TYPE',
'DATAMART_ID'
]
missing_fields = [field for field in required_fields if field not in config or not config[field]]
if missing_fields:
raise ValueError(f"Missing required configuration fields: {', '.join(missing_fields)}")
# Validate provider-specific configuration
provider_config = config.get('LLM_PROVIDER_CONFIG', {})
# Validate LLM provider type (now inside LLM_PROVIDER_CONFIG)
llm_provider = provider_config.get('type', '').lower()
if not llm_provider:
raise ValueError("Missing 'type' field in LLM_PROVIDER_CONFIG")
if llm_provider not in ['watsonx', 'watsonx.ai', 'openai']:
raise ValueError(f"Invalid LLM provider type: {llm_provider}. Must be 'watsonx', 'watsonx.ai', or 'openai'")
if llm_provider in ['watsonx', 'watsonx.ai']:
# Check if CPD or Cloud deployment using wos_client.is_cp4d
is_cpd = self._ai_client.is_cp4d
if is_cpd:
# CPD deployment: require url and either apikey OR (username + password)
if 'url' not in provider_config or not provider_config['url']:
raise ValueError("Missing required CPD field 'url' in LLM_PROVIDER_CONFIG")
has_apikey = 'apikey' in provider_config or 'API_KEY' in provider_config
has_username_password = (('CPD_USERNAME' in provider_config and 'CPD_PASSWORD' in provider_config) or
('username' in provider_config and 'password' in provider_config))
if not has_apikey and not has_username_password:
raise ValueError("CPD deployment requires either 'apikey' OR ('CPD_USERNAME' + 'CPD_PASSWORD') in LLM_PROVIDER_CONFIG")
else:
# Cloud deployment: require apikey (or API_KEY)
if 'apikey' not in provider_config and 'API_KEY' not in provider_config:
raise ValueError("Cloud deployment requires 'apikey' (or 'API_KEY') in LLM_PROVIDER_CONFIG")
else: # openai
required_provider_fields = ['API_KEY', 'MODEL_NAME']
missing_provider_fields = [field for field in required_provider_fields
if field not in provider_config or not provider_config[field]]
if missing_provider_fields:
raise ValueError(f"Missing required {llm_provider} provider fields: {', '.join(missing_provider_fields)}")
# Validate custom monitor configuration
monitor_config = config.get('CUSTOM_MONITOR_CONFIG', {})
# Set defaults for LLMaaJ-specific fields if not provided
if 'INPUT_DATA_TYPES' not in monitor_config or not monitor_config['INPUT_DATA_TYPES']:
monitor_config['INPUT_DATA_TYPES'] = ['unstructured_text']
logging.info("INPUT_DATA_TYPES not provided in CUSTOM_MONITOR_CONFIG, defaulting to ['unstructured_text'] for LLMaaJ")
if 'ALGORITHM_TYPES' not in monitor_config or not monitor_config['ALGORITHM_TYPES']:
# Supported problem types for LLMaaJ
supported_problem_types = [
'question_answering',
'summarization',
'generation',
'extraction',
'retrieval_augmented_generation',
'classification'
]
# Use PROBLEM_TYPE from config if provided, otherwise use all supported types
if 'PROBLEM_TYPE' in config:
monitor_config['ALGORITHM_TYPES'] = [config['PROBLEM_TYPE']]
logging.info(f"ALGORITHM_TYPES not provided in CUSTOM_MONITOR_CONFIG, using PROBLEM_TYPE: ['{config['PROBLEM_TYPE']}']")
else:
monitor_config['ALGORITHM_TYPES'] = supported_problem_types
logging.info(f"ALGORITHM_TYPES not provided in CUSTOM_MONITOR_CONFIG, defaulting to all supported types: {supported_problem_types}")
# MONITOR_METRICS is required - users must define their own metrics with proper grading_options
required_monitor_fields = ['CUSTOM_MONITOR_NAME', 'MONITOR_METRICS']
missing_monitor_fields = [field for field in required_monitor_fields
if field not in monitor_config or not monitor_config[field]]
if missing_monitor_fields:
raise ValueError(f"Missing required custom monitor fields: {', '.join(missing_monitor_fields)}")
[docs]
def create_llm_provider(self, llm_provider_type, provider_config, delete_llm_provider=True):
"""
Create an integrated system for LLM evaluator (generative_ai_evaluator type).
This matches the REST API flow where integrated system is created with type "generative_ai_evaluator".
:param str llm_provider_type: Provider type ('watsonx', 'watsonx.ai', or 'openai')
:param dict provider_config: Provider-specific configuration
:param bool delete_llm_provider: Whether to delete existing LLM provider if found (default: True)
:return: Integrated system ID
:rtype: str
"""
# Normalize provider name
is_cpd = self._ai_client.is_cp4d
if llm_provider_type == 'watsonx.ai' or llm_provider_type == 'openai':
provider_type = llm_provider_type
llm_provider_type = 'watsonx'
else:
raise ValueError("The provider type should be either watsonx.ai or openai ")
datamart_id = self._ai_client.service_instance_id
provider_name = f"LLM_Evaluator_{llm_provider_type}_{datamart_id}"
# Check if provider already exists
existing_systems = IntegratedSystems(self._ai_client).list().result.integrated_systems
for system in existing_systems:
if system.entity.name == provider_name:
if delete_llm_provider:
logging.info(f"Deleting existing LLM provider: {provider_name}")
IntegratedSystems(self._ai_client).delete(integrated_system_id=system.metadata.id)
else:
logging.info(f"Reusing existing LLM provider: {provider_name}")
self.results["llm_provider_id"] = system.metadata.id
self.results["llm_provider_name"] = system.entity.name
return system.metadata.id
# Build credentials based on wos_client.is_cp4d (following normal custom metrics pattern)
credentials = {}
url = None
# Set wml_location
wml_location = provider_config.get('WML_LOCATION') or provider_config.get('wml_location')
if wml_location:
credentials['wml_location'] = wml_location
elif not wml_location and provider_type == 'watsonx.ai':
raise ValueError("WML Location is required for watsonx.ai type ")
# Set url
url = provider_config.get('URL') or provider_config.get('url')
if url:
credentials['url'] = url
if is_cpd:
# CPD deployment: accept either apikey OR username+password
# Based on CustomCredentials.AuthTypeEnum: valid values are 'basic' or 'api_key'
# For CPD, use 'basic' auth_type with auth_provider='cpd'
apikey = provider_config.get('API_KEY') or provider_config.get('apikey')
username = provider_config.get('CPD_USERNAME') or provider_config.get('USERNAME') or provider_config.get('username')
password = provider_config.get('CPD_PASSWORD') or provider_config.get('PASSWORD') or provider_config.get('password')
if apikey and wml_location == 'cloud_remote' or provider_type == 'openai' :
credentials['apikey'] = apikey
elif apikey and username:
# Use username+apikey for CPD with basic auth
credentials['auth_type'] = 'basic'
credentials['auth_provider'] = 'cpd'
credentials['username'] = username
credentials['apikey'] = apikey
logging.info("Using username+apikey for CPD authentication with auth_type=basic")
elif username and password:
# Use username+password for CPD with basic auth
credentials['auth_type'] = 'basic'
credentials['auth_provider'] = 'cpd'
credentials['username'] = username
credentials['apikey'] = password
credentials['password'] = password
logging.info("Using username+password for CPD authentication with auth_type=basic")
else:
raise ValueError("CPD deployment requires either 'apikey' OR ('username' + 'password'/'apikey')")
else:
# Cloud deployment: use apikey with auth_type='api_key' (as per evaluators.py example)
apikey = provider_config.get('API_KEY') or provider_config.get('apikey')
if apikey:
credentials['auth_type'] = 'api_key'
credentials['apikey'] = apikey
credentials['auth_provider'] = 'cloud'
credentials['auth_url'] = 'https://iam.cloud.ibm.com/identity/token'
logging.info("Using apikey for Cloud authentication with auth_provider=cloud")
else:
raise ValueError("Cloud deployment requires 'apikey'")
# Get model_id from provider_config
model_id = provider_config.get('model_id') or provider_config.get('MODEL_NAME') or provider_config.get('MODEL_ID')
# Create integrated system with type "generative_ai_evaluator"
parameters = {
"evaluator_type": provider_type,
"model_id": model_id
}
# Add space_id or project_id to integrated system parameters
# For watsonx: get from provider_config
# For OpenAI: space_id is NOT required in integrated system
if llm_provider_type == 'watsonx' and provider_type != 'openai':
# Get space_id from provider_config
# Support both uppercase and lowercase keys
space_id = provider_config.get('SPACE_ID') or provider_config.get('space_id')
project_id = provider_config.get('PROJECT_ID') or provider_config.get('project_id')
if space_id:
parameters['space_id'] = space_id
logging.info(f"[DEBUG] Added space_id to integrated system parameters: {space_id}")
elif project_id:
parameters['project_id'] = project_id
logging.info(f"[DEBUG] Added project_id to integrated system parameters: {project_id}")
else:
logging.warning("[DEBUG] No space_id/project_id found in llm_provider_config!")
payload = {
"name": provider_name,
"description": f"LLM evaluator system for {llm_provider_type}",
"type": "generative_ai_evaluator",
"parameters": parameters,
"credentials": credentials
}
system = self._ai_client.integrated_systems.add(**payload).result
self.results["llm_provider_id"] = system.metadata.id
self.results["llm_provider_name"] = system.entity.name
return system.metadata.id
def _invoke_prompt_setup(self, config, custom_monitor_id, llm_provider_id):
"""
Invoke the Prompt Setup API to create subscription and monitor instance.
This matches the REST API flow: POST /v2/prompt_setup followed by polling GET /v2/prompt_setup.
Note: For WatsonX, provide either PROJECT_ID or SPACE_ID (not both).
- Use PROJECT_ID for project-based deployments
- Use SPACE_ID + DEPLOYMENT_ID for space-based deployments
:param dict config: Configuration dictionary
:param str custom_monitor_id: Custom monitor definition ID
:param str llm_provider_id: LLM evaluator integrated system ID
:return: Prompt setup response with subscription_id and service_provider_id
:rtype: dict
"""
# Validate that either project_id or space_id is provided
# Support both uppercase and lowercase keys
project_id = config.get('PROJECT_ID') or config.get('project_id')
space_id = config.get('SPACE_ID') or config.get('space_id')
deployment_id = config.get('DEPLOYMENT_ID') or config.get('deployment_id')
if not project_id and not space_id:
raise ValueError("Either PROJECT_ID or SPACE_ID must be provided in the configuration")
# If space_id is provided, deployment_id is mandatory
if space_id and not deployment_id:
raise ValueError("DEPLOYMENT_ID is mandatory when SPACE_ID is provided")
# Fetch the monitor definition to get actual metric IDs
monitor_def = self._ai_client.monitor_definitions.get(monitor_definition_id=custom_monitor_id).result
# Build a mapping from metric names to metric IDs
metric_name_to_id = {}
if hasattr(monitor_def.entity, 'metrics') and monitor_def.entity.metrics:
for metric in monitor_def.entity.metrics:
if hasattr(metric, 'name') and hasattr(metric, 'id'):
metric_name_to_id[metric.name] = metric.id
# Build monitors payload - read metrics from MONITOR_METRICS and use actual metric IDs
metrics = []
for metric_def in config.get('MONITOR_METRICS', []):
metric_name = metric_def.get('name')
# Use the actual metric ID from the monitor definition instead of the name
metric_id = metric_name_to_id.get(metric_name, metric_name)
metric = {
'metric_id': metric_id,
'dataset_type': metric_def.get('dataset_type', 'feedback')
}
# Add grader prompt mappings if provided
if 'grader_prompt_variables_mapping' in metric_def:
metric['grader_prompt_variables_mapping'] = metric_def['grader_prompt_variables_mapping']
# Add LLM provider ID (metric-specific or default)
if 'llm_provider_id' in metric_def:
metric['llm_provider_id'] = metric_def['llm_provider_id']
# Add thresholds if provided in metric definition
if 'thresholds' in metric_def:
threshold_cfg = metric_def['thresholds']
thresholds = []
for threshold_type, value in threshold_cfg.items():
if threshold_type.lower() == 'lower_limit':
thresholds.append({
'type': 'lower_limit',
'default': value
})
elif threshold_type.lower() == 'upper_limit':
thresholds.append({
'type': 'upper_limit',
'default': value
})
if thresholds:
metric['thresholds'] = thresholds
metrics.append(metric)
# Build parameters
# For LLMaaJ, llm_provider_id should only be in metrics_configuration
parameters = {
'min_records': config.get('MIN_RECORDS', 10),
'max_records': config.get('MAX_RECORDS', 100),
'metrics_configuration': {
'dataset_type': 'feedback',
'llm_provider_id': llm_provider_id,
'metrics': metrics
}
}
# Add space_id or project_id to parameters so monitor runs can access the LLM
# Support both uppercase and lowercase keys
space_id_param = config.get('SPACE_ID') or config.get('space_id')
if space_id_param:
parameters['space_id'] = space_id_param
project_id_param = config.get('PROJECT_ID') or config.get('project_id')
if project_id_param:
parameters['project_id'] = project_id_param
# Build thresholds from MONITOR_METRICS
thresholds = []
for metric_def in config.get('MONITOR_METRICS', []):
metric_name = metric_def.get('name')
metric_id = metric_name_to_id.get(metric_name, metric_name)
if 'thresholds' in metric_def:
threshold_cfg = metric_def['thresholds']
for threshold_type, value in threshold_cfg.items():
threshold_entry = {
'metric_id': metric_id,
'type': threshold_type.lower(),
'value': value
}
thresholds.append(threshold_entry)
# Build monitors dictionary
monitors = {
custom_monitor_id: {
'parameters': parameters,
'thresholds': thresholds
}
}
# Build query parameters for polling
query_params = {
'prompt_template_asset_id': config.get('PROMPT_TEMPLATE_ASSET_ID')
}
# Support both uppercase and lowercase keys
project_id_query = config.get('PROJECT_ID') or config.get('project_id')
if project_id_query:
query_params['project_id'] = project_id_query
space_id_query = config.get('SPACE_ID') or config.get('space_id')
if space_id_query:
query_params['space_id'] = space_id_query
deployment_id_query = config.get('DEPLOYMENT_ID') or config.get('deployment_id')
if deployment_id_query:
query_params['deployment_id'] = deployment_id_query
# Call prompt setup API (this will be async, need to poll)
mrm = ModelRiskManagement(self._ai_client)
# Build parameters dict dynamically (only include non-None values)
prompt_setup_params = {
'prompt_template_asset_id': config.get('PROMPT_TEMPLATE_ASSET_ID'),
'label_column': config.get('LABEL_COLUMN'),
'operational_space_id': config.get('OPERATIONAL_SPACE_ID'),
'problem_type': config.get('PROBLEM_TYPE'),
'input_data_type': config.get('INPUT_DATA_TYPE'),
'supporting_monitors': monitors
}
# Add optional parameters only if provided
# Support both uppercase and lowercase keys
project_id_setup = config.get('PROJECT_ID') or config.get('project_id')
if project_id_setup:
prompt_setup_params['project_id'] = project_id_setup
space_id_setup = config.get('SPACE_ID') or config.get('space_id')
if space_id_setup:
prompt_setup_params['space_id'] = space_id_setup
deployment_id_setup = config.get('DEPLOYMENT_ID') or config.get('deployment_id')
if deployment_id_setup:
prompt_setup_params['deployment_id'] = deployment_id_setup
# Support both uppercase and lowercase keys for RAG parameters
if config.get('CONTEXT_FIELDS') or config.get('context_fields'):
prompt_setup_params['context_fields'] = config.get('CONTEXT_FIELDS') or config.get('context_fields')
if config.get('QUESTION_FIELD') or config.get('question_field'):
prompt_setup_params['question_field'] = config.get('QUESTION_FIELD') or config.get('question_field')
if config.get('DATA_INPUT_LOCALE'):
prompt_setup_params['data_input_locale'] = config['DATA_INPUT_LOCALE']
if config.get('GENERATED_OUTPUT_LOCALE'):
prompt_setup_params['generated_output_locale'] = config['GENERATED_OUTPUT_LOCALE']
# Use the new wos.execute_prompt_setup() method instead of deprecated mrm.execute_prompt_setup()
from ibm_watson_openscale.wos import WOS
wos = WOS(self._ai_client, self.service_url)
response = wos.execute_prompt_setup(**prompt_setup_params)
return self._poll_prompt_setup(query_params)
def _poll_prompt_setup(self, query_params, max_attempts=30, poll_interval=2):
"""
Poll the prompt setup API until completion.
:param dict query_params: Query parameters for the prompt setup API
:param int max_attempts: Maximum number of polling attempts
:param int poll_interval: Interval between polls in seconds
:return: Prompt setup response with subscription_id and service_provider_id
:rtype: dict
"""
import time
for attempt in range(max_attempts):
try:
# Use the new wos.get_prompt_setup() method instead of deprecated mrm.get_prompt_setup()
from ibm_watson_openscale.wos import WOS
wos = WOS(self._ai_client, self.service_url)
response = wos.get_prompt_setup(**query_params)
result = response.result.to_dict() if hasattr(response, 'result') else response
if isinstance(result, dict):
status = result.get('status', {})
state = status.get('state')
if state == 'FINISHED':
# Extract subscription_id and service_provider_id
subscription_id = result.get('subscription_id')
service_provider_id = result.get('service_provider_id')
return {
'subscription_id': subscription_id,
'service_provider_id': service_provider_id,
'status': status,
'full_response': result
}
elif state in ['FAILED', 'ERROR']:
# Extract error details from the response
error_msg = status.get('message', 'No error message provided')
failure_info = status.get('failure', {})
# Extract error code and message
error_code = None
error_details_msg = None
if isinstance(failure_info, dict):
error_code = failure_info.get('code')
error_details_msg = failure_info.get('message')
errors = failure_info.get('errors', [])
if errors and isinstance(errors, list) and len(errors) > 0:
first_error = errors[0]
if isinstance(first_error, dict):
error_code = first_error.get('code', error_code)
error_details_msg = first_error.get('message', error_details_msg)
# Build concise error message
error_parts = [f"Prompt setup failed: {state}"]
if error_code:
error_parts.append(f"Code: {error_code}")
if error_details_msg:
error_parts.append(f"Message: {error_details_msg}")
elif error_msg:
error_parts.append(f"Message: {error_msg}")
error_summary = " | ".join(error_parts)
logging.error(error_summary)
raise Exception(error_summary)
logging.info(f"Prompt setup in progress, state: {state}, attempt {attempt + 1}/{max_attempts}")
time.sleep(poll_interval)
except Exception as e:
if attempt == max_attempts - 1:
raise Exception(f"Failed to poll prompt setup after {max_attempts} attempts: {str(e)}")
time.sleep(poll_interval)
raise Exception(f"Prompt setup did not complete after {max_attempts} attempts")