Source code for ibm_watson_openscale.monitor_definitions

# coding: utf-8

# Copyright 2020 IBM All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect

from ibm_cloud_sdk_core import BaseService

from ibm_watson_openscale.base_classes.tables import Table
from .utils import *
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import Monitors as BaseMonitors
from typing import Tuple

if TYPE_CHECKING:
    from .client import WatsonOpenScaleV2Adapter
    from .base_classes.watson_open_scale_v2 import MonitorMetricRequest, MonitorTagRequest, ApplicabilitySelection, \
        MonitorInstanceSchedule, DetailedResponse

_DEFAULT_LIST_LENGTH = 50


# TODO: Add parameters validation in every method
[docs] class MonitorDefinitions(BaseMonitors): """ Manages Monitor Definitions. """ def __init__(self, ai_client: 'WatsonOpenScaleV2Adapter', project_id=None, space_id=None) -> None: validate_type(ai_client, 'ai_client', BaseService, True) self._ai_client = ai_client super().__init__(watson_open_scale=self._ai_client) self.MONITORS = type('MONITORS_', (), self._create_monitors_enum(project_id=project_id, space_id=space_id)) ################################################ # Hidden methods from base monitor class # ################################################ @property def list_instances(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, please use " f"client.monitor_definitions.{inspect.currentframe().f_code.co_name.split('_')[0]}() instead") @property def add_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, please use " f"client.monitor_definitions.{inspect.currentframe().f_code.co_name.split('_')[0]}() instead") @property def get_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, please use " f"client.monitor_definitions.{inspect.currentframe().f_code.co_name.split('_')[0]}() instead") @property def update_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, please use " f"client.monitor_definitions.{inspect.currentframe().f_code.co_name.split('_')[0]}() instead") @property def delete_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, please use " f"client.monitor_definitions.{inspect.currentframe().f_code.co_name.split('_')[0]}() instead") @property def run_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, " f"please use client.monitor_instances instead") @property def list_runs(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, " f"please use client.monitor_instances instead") @property def get_run_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, " f"please use client.monitor_instances instead") @property def update_run_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, " f"please use client.monitor_instances instead") @property def get_run_logs_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, " f"please use client.monitor_instances instead") @property def publish_instance_measurements(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, " f"please use client.monitor_instances instead") @property def list_instance_measurements(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, " f"please use client.monitor_instances instead") @property def get_instance_measurement(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, " f"please use client.monitor_instances instead") @property def list_instance_metrics(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, " f"please use client.monitor_instances instead") @property def list_measurements(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, " f"please use client.monitor_instances instead")
[docs] def add(self, name: str, metrics: List['MonitorMetricRequest'], tags: List['MonitorTagRequest'], description: str = None, applies_to: 'ApplicabilitySelection' = None, parameters_schema: object = None, managed_by: str = None, schedule: 'MonitorInstanceSchedule' = None, monitor_runtime: 'MonitorRuntime' = None, project_id: str = None, space_id: str = None, background_mode: bool = True) -> Union['DetailedResponse', Optional[dict]]: """ Add custom monitor. :param str name: Monitor UI label (must be unique). :param List[MonitorMetricRequest] metrics: A list of metric definition. :param List[MonitorTagRequest] tags: Available tags. :param str description: (optional) Long monitoring description presented in monitoring catalog. :param ApplicabilitySelection applies_to: (optional) :param object parameters_schema: (optional) JSON schema that will be used to validate monitoring parameters when enabled. :param str managed_by: (optional) :param MonitorInstanceSchedule schedule: (optional) The schedule used to control how frequently the target is monitored. The maximum frequency is once every 30 minutes. Defaults to once every hour if not specified. :param MonitorRuntime monitor_runtime: (optional) Field to specify if scheduler should be created or not. :param background_mode: if set to True, run will be in asynchronous mode, if set to False it will wait for result (optional) :param str project_id: Id of the Project context in which governance operation is happening (optional) :param str space_id: Id of the Space context in which governance operation is happening (optional) :type background_mode: bool :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorDisplayForm` result A way you may use me: >>> from ibm_watson_openscale import * >>> metrics = [ MonitorMetricRequest( name='sensitivity', thresholds=[MetricThreshold( type=MetricThresholdTypes.LOWER_LIMIT, default=0.8 )] ), MonitorMetricRequest( name='specificity', thresholds=[MetricThreshold( type=MetricThresholdTypes.LOWER_LIMIT, default=0.75 )] ), ] >>> tags = [ MonitorTagRequest( name='region', description='customer geographical region' ) ] >>> my_monitor = client.monitor_definitions.add( name='my model performance', metrics=metrics, tags=tags, background_mode=False) """ response = super().add(name=name, metrics=metrics, tags=tags, description=description, applies_to=applies_to, parameters_schema=parameters_schema, managed_by=managed_by, schedule=schedule, monitor_runtime=monitor_runtime, project_id=project_id, space_id=space_id) self.MONITORS = type('MONITORS_', (), self._create_monitors_enum(project_id=project_id, space_id=space_id)) monitor_definition_id = response.result.metadata.id if background_mode: return response else: def check_state() -> dict: try: details = self.get(monitor_definition_id=monitor_definition_id, project_id=project_id, space_id=space_id) return StatusStateType.FINISHED except Exception as e: return StatusStateType.ACTIVE def get_result() -> Union[Tuple[str, Union[None, str], 'DetailedResponse']]: try: details = self.get(monitor_definition_id=monitor_definition_id, project_id=project_id, space_id=space_id) state = StatusStateType.FINISHED except Exception as e: state = StatusStateType.ACTIVE if state in [StatusStateType.FINISHED]: return "Successfully finished adding monitor definition", None, response else: return "Add monitor definition failed", f'Reason: {e}', response return print_synchronous_run( 'Waiting for end of adding monitor definition {}'.format(monitor_definition_id), check_state, get_result=get_result, success_states=[StatusStateType.FINISHED] )
[docs] def delete(self, monitor_definition_id: str, background_mode: bool = True, force: bool = False, project_id: str = None, space_id: str = None) -> Union[ 'DetailedResponse', Optional[dict]]: """ Delete custom monitor. :param str monitor_definition_id: Unique monitor definition ID. :param background_mode: if set to True, run will be in asynchronous mode, if set to False it will wait for result (optional) :param str project_id: Id of the Project context in which governance operation is happening (optional) :param str space_id: Id of the Space context in which governance operation is happening (optional) :type background_mode: bool :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse A way you may use me: >>> client.monitor_definitions.delete(monitor_definition_id='997b1474-00d2-4g05-ac02-287ebfc603b5') """ response = super().delete(monitor_definition_id=monitor_definition_id, force=force, project_id=project_id, space_id=space_id) if background_mode: return response else: def check_state() -> dict: details = self.list(project_id=project_id, space_id=space_id) if monitor_definition_id not in str(details.result): return StatusStateType.FINISHED else: return StatusStateType.ACTIVE def get_result() -> Union[Tuple[str, Union[None, str], 'DetailedResponse']]: details = self.list(project_id=project_id, space_id=space_id) if monitor_definition_id not in str(details.result): state = StatusStateType.FINISHED else: state = StatusStateType.ACTIVE if state in [StatusStateType.FINISHED]: return "Successfully finished deleting monitor definition", None, response else: return "Delete monitor definition failed", 'Reason: None', response # TODO: Need to show the reason. return print_synchronous_run( 'Waiting for end of deleting monitor definition {}'.format(monitor_definition_id), check_state, get_result=get_result, success_states=[StatusStateType.FINISHED] )
[docs] def update(self, monitor_definition_id: str, name: str, metrics: List['MonitorMetricRequest'], tags: List['MonitorTagRequest'], description: str = None, applies_to: 'ApplicabilitySelection' = None, parameters_schema: object = None, managed_by: str = None, schedule: 'MonitorInstanceSchedule' = None, project_id: str = None, space_id: str = None ) -> 'DetailedResponse': """ Edit custom monitor. Update monitor. :param str monitor_definition_id: Unique monitor definition ID. :param str name: Monitor UI label (must be unique). :param List[MonitorMetricRequest] metrics: A list of metric definition. :param List[MonitorTagRequest] tags: Available tags. :param str description: (optional) Long monitoring description presented in monitoring catalog. :param ApplicabilitySelection applies_to: (optional) :param object parameters_schema: (optional) JSON schema that will be used to validate monitoring parameters when enabled. :param str managed_by: (optional) :param MonitorInstanceSchedule schedule: (optional) The schedule used to control how frequently the target is monitored. The maximum frequency is once every 30 minutes. Defaults to once every hour if not specified. :param str project_id: Id of the Project context in which governance operation is happening (optional) :param str space_id: Id of the Space context in which governance operation is happening (optional) :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorDisplayForm` result A way you may use me: >>> from ibm_watson_openscale import * >>> metrics = [ MonitorMetricRequest( name='sensitivity', thresholds=[MetricThreshold( type=MetricThresholdTypes.LOWER_LIMIT, default=0.7 )] ), MonitorMetricRequest( name='specificity', thresholds=[MetricThreshold( type=MetricThresholdTypes.LOWER_LIMIT, default=0.6 )] ), ] >>> tags = [ MonitorTagRequest( name='region', description='customer geographical region'.upper() ) ] >>> my_monitor = client.monitor_definitions.update( monitor_definition_id='monitor_definition_id', name='my model performance', metrics=metrics, tags=tags) """ response = super().update(monitor_definition_id=monitor_definition_id, name=name, metrics=metrics, tags=tags, description=description, applies_to=applies_to, parameters_schema=parameters_schema, managed_by=managed_by, schedule=schedule, project_id=project_id, space_id=space_id) self.MONITORS = type('MONITORS_', (), self._create_monitors_enum(project_id=project_id, space_id=space_id)) return response
################################################# # New methods for monitor instances # #################################################
[docs] def show(self, limit: int = 10, project_id = None, space_id = None) -> None: """ Show monitor definitions. By default 10 records will be shown. :param limit: maximal number of fetched rows. By default set to 10. (optional) :param str project_id: Id of the Project context in which governance operation is happening (optional) :param str space_id: Id of the Space context in which governance operation is happening (optional) :type limit: int A way you might use me is: >>> client.monitor_definitions.show() >>> client.monitor_definitions.show(limit=20) >>> client.monitor_definitions.show(limit=None) """ validate_type(limit, u'limit', int, False) response = self.list(project_id=project_id, space_id=space_id) records = [[definition.metadata.id, definition.entity.name, [metric.name for metric in definition.entity.metrics]] for definition in response.result.monitor_definitions] columns = ['monitor id', 'monitor name', 'metrics names'] Table(columns, records).list( limit=limit, default_limit=_DEFAULT_LIST_LENGTH, title="Monitor definitions" )
def _create_monitors_enum(self, project_id = None, space_id = None): response = self.list(project_id=project_id, space_id=space_id) return {definition.entity.name.upper().replace(' ', '_'): self._create_metrics_enum(definition) for definition in response.result.monitor_definitions} def _create_metrics_enum(self, definition): return type('Monitor_', (), { 'ID': definition.metadata.id, 'METRIC': self._create_metrics_properties(definition) }) @staticmethod def _create_metrics_properties(definition): return type('MetricProperty_', (), {metric.name.upper().replace(' ', '_').split('(')[0]: metric.id for metric in definition.entity.metrics})