Source code for ibm_watson_openscale.subscriptions

# coding: utf-8

# Copyright 2020,2021 IBM All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from typing import Tuple
import uuid

from ibm_cloud_sdk_core import BaseService
from ibm_watson_openscale.base_classes.tables import Table
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import Asset, AssetPropertiesRequest, \
    AssetDeploymentRequest, RiskEvaluationStatus
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import SparkStruct, IntegratedSystems,IntegratedSystemsListEnums,AnalyticsEngine,JsonPatchOperation, DataSourceStatus, DataSource, DataSourceConnection
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import Subscriptions as BaseSubscriptions
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import SchemaUtility
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import ScoringEndpointRequest, ScoringEndpointCredentialsAzureScoringEndpointCredentials
from ibm_watson_openscale.supporting_classes.enums import *
from ibm_watson_openscale.supporting_classes import Target

from .utils import *
import time
from datetime import datetime

import json
import tarfile
import copy
import requests

if TYPE_CHECKING:
    from .client import WatsonOpenScaleV2Adapter
    from ibm_watson_openscale.base_classes.watson_open_scale_v2 import DetailedResponse

_DEFAULT_LIST_LENGTH = 50


# TODO: Add parameters validation in every method
[docs] class Subscriptions(BaseSubscriptions): """ Manages Subscription instance. """ def __init__(self, ai_client: 'WatsonOpenScaleV2Adapter') -> None: validate_type(ai_client, 'ai_client', BaseService, True) self._ai_client = ai_client super().__init__(watson_open_scale=self._ai_client)
[docs] def show(self, limit: Optional[int] = 10, data_mart_id: str = None, service_provider_id: str = None, asset_asset_id: str = None, asset_asset_type: str = None, deployment_deployment_id: str = None, deployment_deployment_type: str = None, integration_reference_integrated_system_id: str = None, integration_reference_external_id: str = None, risk_evaluation_status_state: str = None, service_provider_operational_space_id: str = None, pre_production_reference_id: str = None, project_id: str = None, space_id: str = None, **kwargs) -> None: """ Show service providers. By default 10 records will be shown. :param limit: maximal number of fetched rows. By default set to 10. (optional) :type limit: int :param str data_mart_id: (optional) comma-separated list of IDs. :param str service_provider_id: (optional) comma-separated list of IDs. :param str asset_asset_id: (optional) comma-separated list of IDs. :param str asset_asset_type: (optional) comma-separated list of types. :param str deployment_deployment_id: (optional) comma-separated list of IDs. :param str deployment_deployment_type: (optional) comma-separated list of types. :param str integration_reference_integrated_system_id: (optional) comma-separated list of IDs. :param str integration_reference_external_id: (optional) comma-separated list of IDs. :param str risk_evaluation_status_state: (optional) comma-separated list of states. :param str service_provider_operational_space_id: (optional) comma-separated list of operational space ids (property of service provider object). :param str pre_production_reference_id: (optional) comma-separated list of IDs. :param dict headers: A `dict` containing the request headers :param str project_id: Id of the Project context in which governance operation is happening (optional) :param str space_id: Id of the Space context in which governance operation is happening (optional) A way you might use me is: >>> client.subscriptions.show() >>> client.subscriptions.show(limit=20) >>> client.subscriptions.show(limit=None) """ validate_type(limit, u'limit', int, False) response = self.list(data_mart_id = data_mart_id, service_provider_id = service_provider_id, asset_asset_id = asset_asset_id, asset_asset_type = asset_asset_type, deployment_deployment_id = deployment_deployment_id, deployment_deployment_type = deployment_deployment_type, integration_reference_integrated_system_id = integration_reference_integrated_system_id, integration_reference_external_id = integration_reference_external_id, risk_evaluation_status_state = risk_evaluation_status_state, service_provider_operational_space_id = service_provider_operational_space_id, pre_production_reference_id = pre_production_reference_id, project_id = project_id, space_id = space_id, **kwargs) records = [[subscription.entity.asset.asset_id, subscription.entity.asset.asset_type, subscription.entity.asset.name, subscription.entity.data_mart_id, subscription.entity.deployment.deployment_id, subscription.entity.deployment.name, subscription.entity.service_provider_id, subscription.entity.status.state, subscription.metadata.created_at, subscription.metadata.id ] for subscription in response.result.subscriptions] columns = ['asset_id', 'asset_type', 'asset_name', 'data_mart_id', 'deployment_id', 'deployment_name', 'service_provider_id', 'status', 'created_at', 'id'] Table(columns, records).list( limit=limit, default_limit=_DEFAULT_LIST_LENGTH, title="Subscriptions" )
[docs] def add(self, data_mart_id: str, service_provider_id: str, asset: 'Asset', deployment: 'AssetDeploymentRequest', asset_properties: 'AssetPropertiesRequest' = None, risk_evaluation_status: 'RiskEvaluationStatus' = None, analytics_engine: 'AnalyticsEngine' = None, data_sources: List['DataSource'] = None, training_data_stats: dict = None, project_id: str = None, space_id: str = None, background_mode: bool = True, **kwargs) -> Union['DetailedResponse', Optional[dict]]: """ Add a subscription to the model deployment. :param str data_mart_id: ID of the data_mart (required) :param str service_provider_id: ID of the service_provider (required) :param Asset asset: an Asset object with asset's information (required) :param AssetDeploymentRequest deployment: an AssetDeploymentRequest object with deployment's information (required) :param AssetPropertiesRequest asset_properties: (optional) Additional asset properties (subject of discovery if not provided when creating the subscription). :param RiskEvaluationStatus risk_evaluation_status: (optional) :param AnalyticsEngine analytics_engine: (optional) :param List[DataSource] data_sources: (optional) :param training_data_stats: Training statistic json generated using training stats notebook (https://github.com/IBM/watson-openscale-samples/blob/main/training%20statistics/training_statistics_notebook.ipynb) :param background_mode: if set to True, run will be in asynchronous mode, if set to False it will wait for result (optional) :param str project_id: Id of the Project context in which governance operation is happening (optional) :param str space_id: Id of the Space context in which governance operation is happening (optional) :type background_mode: bool :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `SubscriptionResponse` result A way you may use me: >>> from ibm_watson_openscale import * >>> added_subscription_info = client.subscriptions.add( data_mart_id='997b1474-00d2-4g05-ac02-287ebfc603b5', service_provider_id='997b1474-00d2-4g05-ac02-287ebfc603b5', asset=Asset(...), deployment=AssetDeploymentRequest(...), asset_properties=AssetPropertiesRequest(...), ) """ validate_type(data_mart_id, 'data_mart_id', str, True) validate_type(service_provider_id, 'service_provider_id', str, True) if self._ai_client.check_entitlements is True and self._ai_client.plan_name == constants.LITE_PLAN: total_subscriptions = self.list(project_id = project_id, space_id = space_id).result.subscriptions if len(total_subscriptions) >= 5: raise Exception("You are not allowed to create more than 5 subscriptions for lite plan.") if asset_properties is None and training_data_stats is None: raise Exception("Either asset_properties or training_data_stats has to be passed") ### If "other" data type is present in output or input schema then remove it from subscription before saving it. ### https://github.ibm.com/aiopenscale/tracker/issues/19160 if asset_properties is not None: asset_props = asset_properties.to_dict() if 'input_data_schema' in asset_props: input_schema = asset_props['input_data_schema'] if input_schema is not None: has_other_data_type = self._has_other_datatype(input_schema['fields']) if has_other_data_type is True: asset_properties.input_data_schema = None if 'output_data_schema' in asset_props: output_schema = asset_props['output_data_schema'] if output_schema is not None: has_other_data_type = self._has_other_datatype(output_schema['fields']) if has_other_data_type is True: asset_properties.output_data_schema = None if 'training_data_schema' in asset_props: training_schema = asset_props['training_data_schema'] if training_schema is not None: has_other_data_type = self._has_other_datatype(training_schema['fields']) if has_other_data_type is True: asset_properties.training_data_schema = None if training_data_stats is None: response = super().add(data_mart_id=data_mart_id, service_provider_id=service_provider_id, asset=asset, deployment=deployment, asset_properties=asset_properties, risk_evaluation_status=risk_evaluation_status, analytics_engine=analytics_engine, data_sources=data_sources, project_id = project_id, space_id = space_id) else: #Create subscription using data available in training stats if len(training_data_stats) == 0: raise Exception("training_data_stats is empty. Please re-generate and use it") response = self.__create_subscription_from_training_stats(data_mart_id, service_provider_id, training_data_stats, kwargs,background_mode, project_id = project_id, space_id = space_id) subscription_id = response.result.metadata.id self.__create_default_monitors(subscription_id, data_mart_id, service_provider_id) if background_mode: return response else: def check_state() -> dict: details = self.get(subscription_id=subscription_id, project_id = project_id, space_id = space_id) return details.result.entity.status.state def get_result() -> Union[Tuple[str, Union[None, str], 'DetailedResponse']]: details = self.get(subscription_id=subscription_id, project_id = project_id, space_id = space_id) state = details.result.entity.status.state if state in [StatusStateType.ACTIVE]: return "Successfully finished adding subscription", None, details else: return "Add subscription failed with status: {}".format(state), \ 'Reason: {}'.format(["code: {}, message: {}".format(error.code, error.message) for error in details.result.entity.status.failure.errors]), details return print_synchronous_run( 'Waiting for end of adding subscription {}'.format(subscription_id), check_state, get_result=get_result, success_states=[StatusStateType.ACTIVE] )
[docs] def delete(self, subscription_id: str, force: bool = None, project_id: str = None, space_id: str = None, background_mode: bool = True) -> Union['DetailedResponse', Optional[dict]]: """ Delete subscription. :param str subscription_id: Unique subscription ID. :param bool force: (optional) force hard delete. :param background_mode: if set to True, run will be in asynchronous mode, if set to False it will wait for result (optional) :param str project_id: Id of the Project context in which governance operation is happening (optional) :param str space_id: Id of the Space context in which governance operation is happening (optional) :type background_mode: bool :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse A way you may use me: >>> client.subscriptions.delete( background_mode=False, subscription_id='997b1474-00d2-4g05-ac02-287ebfc603b5', force=True ) """ response = super().delete(subscription_id=subscription_id, force=force, project_id = project_id, space_id = space_id) if background_mode: return response else: def check_state() -> dict: details = self.list(project_id = project_id, space_id = space_id) if subscription_id not in str(details.result): return StatusStateType.FINISHED else: return StatusStateType.ACTIVE def get_result() -> Union[Tuple[str, Union[None, str], 'DetailedResponse']]: details = self.list(project_id = project_id, space_id = space_id) if subscription_id not in str(details.result): state = StatusStateType.FINISHED else: state = StatusStateType.ACTIVE if state in [StatusStateType.FINISHED]: return "Successfully finished deleting subscription", None, response else: return "Delete subscription failed", 'Reason: None', response # TODO: Need to show the reason. return print_synchronous_run( 'Waiting for end of deleting subscription {}'.format(subscription_id), check_state, get_result=get_result, success_states=[StatusStateType.FINISHED] )
def _has_other_datatype(self, fields): for field in fields: type = field['type'] if type == 'other': return True return False #Private method to create subscription using training stats data def __create_subscription_from_training_stats(self, data_mart_id, service_provider_id, training_stats_info, params,background_mode, project_id = None, space_id = None): deployment_id = params.get("deployment_id") #Required in case of headless subscription deployment_name = params.get("deployment_name") deployment_space_id = params.get("deployment_space_id") model_type = training_stats_info['common_configuration']['problem_type'] problem_type = None if model_type=="binary": problem_type = ProblemType.BINARY_CLASSIFICATION elif model_type=="multiclass": problem_type = ProblemType.MULTICLASS_CLASSIFICATION elif model_type=="regression": problem_type = ProblemType.REGRESSION prediction_column = params.get("prediction_field") probability_columns = params.get("probability_fields") predicted_target_column = params.get("predicted_target_field") prediction_names = params.get("prediction_names") transaction_id_field = params.get("transaction_id_field") input_token_count_field = params.get("input_token_count_field") output_token_count_field = params.get("output_token_count_field") #validate_type(deployment_id, 'deployment_id', str, True) validate_type(prediction_column, 'prediction_field', str, True) #validate_type(predicted_target_column, 'predicted_target_field', str, True) if deployment_id is None and deployment_name is None : raise Exception("Please provide deployment_id if you have deployment information or else provide deployment_name in case you want to create headless subscription") model_asset_details_from_deployment = {} deployment_found = False if deployment_id is not None and deployment_space_id is not None: model_asset_details_from_deployment=self._ai_client.service_providers.get_deployment_asset(data_mart_id=data_mart_id,service_provider_id=service_provider_id,deployment_id=deployment_id,deployment_space_id=deployment_space_id) if model_asset_details_from_deployment['metadata']['guid']== deployment_id: deployment_found = True if not deployment_found : raise Exception("Deployment with id {} could not be found in deployment space {}. Please check again".format(deployment_id, deployment_space_id)) elif deployment_id is not None and deployment_space_id is None: #For non-WML model, pick the right deployment from list model_asset_details_from_deployment=self._ai_client.service_providers.list_assets(data_mart_id=data_mart_id,service_provider_id=service_provider_id).result models = model_asset_details_from_deployment['resources'] for model in models: if model['metadata']['guid']== deployment_id: model_asset_details_from_deployment = model deployment_found = True break; if not deployment_found : raise Exception("Deployment with id {} could not be found. Please check again".format(deployment_id)) elif deployment_id is None and deployment_space_id is not None: raise Exception("Please provide deployment_id for space {}".format(deployment_space_id)) else: print("Creating headless subscription as both deployment id and deployment space id are null") input_data_schema = training_stats_info["common_configuration"]["input_data_schema"] fields = [] label_column=training_stats_info["common_configuration"]["label_column"] # remove label column entry from input data schema for field in input_data_schema["fields"]: if field["name"] == label_column: continue fields.append(field) required_input_data_schema = { "type": "struct", "fields": fields } training_data_schema = None if deployment_name is not None and not deployment_found: model_asset_id = str(uuid.uuid4()) model_name = deployment_name model_url = "https://dummyModelUrl" deployment_asset_id = str(uuid.uuid4()) deployment_name = deployment_name scoring_endpoint = "https://dummyScoringUrl" else: if not deployment_found: raise Exception("Deployment with id {} not found".format(deployment_id)) model_asset_id = model_asset_details_from_deployment["entity"]["asset"]["asset_id"] model_name = model_asset_details_from_deployment["entity"]["asset"]["name"] model_url = model_asset_details_from_deployment["entity"]["asset"]["url"] deployment_asset_id = model_asset_details_from_deployment['metadata']['guid'] deployment_name = model_asset_details_from_deployment['entity']['name'] scoring_endpoint = model_asset_details_from_deployment['entity']['scoring_endpoint']['url'] if model_asset_details_from_deployment is not None and len(model_asset_details_from_deployment)>0: if "training_data_schema" in model_asset_details_from_deployment["entity"]["asset_properties"]: training_data_schema = SparkStruct.from_dict(model_asset_details_from_deployment["entity"]["asset_properties"]["training_data_schema"]) return super().add( data_mart_id=data_mart_id, service_provider_id=service_provider_id, asset=Asset( asset_id=model_asset_id, name=model_name, url=model_url, asset_type=AssetTypes.MODEL, input_data_type=InputDataType.STRUCTURED, problem_type=problem_type ), deployment=AssetDeploymentRequest( deployment_id=deployment_asset_id, name=deployment_name, deployment_type= DeploymentTypes.ONLINE, url=scoring_endpoint ), asset_properties=AssetPropertiesRequest( transaction_id_field = transaction_id_field, label_column=label_column, probability_fields=probability_columns, prediction_field=prediction_column, predicted_target_field=predicted_target_column, prediction_names = prediction_names, feature_fields = training_stats_info["common_configuration"]["feature_fields"], categorical_fields = training_stats_info["common_configuration"]["categorical_fields"], input_data_schema = SparkStruct.from_dict(required_input_data_schema), training_data_schema=training_data_schema, input_token_count_field = input_token_count_field, output_token_count_field = output_token_count_field, ), project_id = project_id, space_id = space_id, background_mode=background_mode )
[docs] def create_feedback_table(self, subscription_id: str, project_id: str = None, space_id: str = None, **kwargs ) -> 'DetailedResponse': """ Create a table for feedback dataset type. :param str subscription_id: Unique subscription ID. :param dict headers: A `dict` containing the request headers :param str project_id: Id of the Project context in which governance operation is happening (optional) :param str space_id: Id of the Space context in which governance operation is happening (optional) :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataSetResponse` result """ dataset_type = "feedback" payload = {} return super().tables(subscription_id=subscription_id, dataset_type=dataset_type, unknown_base_type=payload, project_id=project_id, space_id=space_id, **kwargs)
[docs] def poll_subscription_status(self, subscription_id, message, ids, delay, project_id = None, space_id = None): subscription_status = None print(message) while subscription_status not in ("active", "error"): subscriprion = self.get(subscription_id, project_id = project_id, space_id = space_id).result.to_dict() subscription_status = get(subscriprion, "entity.status.state") if subscription_status not in ("active", "error"): print(datetime.now().strftime("%H:%M:%S"), subscription_status) time.sleep(delay) if subscription_status == "error": failure = get(subscriprion, "entity.status.failure") if failure == None: print("No failure information available") else: trace = get(failure, "trace") print("Trace: {}".format("" if not trace else trace)) errors = get(failure, "errors") if errors != None and len(errors) != 0: error = errors[0] error_code = get(error, "code") print("Error code: {}".format("" if not error_code else error_code)) error_message = get(error, "message") print("Error message: {}".format("" if not error_message else error_message)) print("Deleting subscription and related integrated systems ..") self.__delete_integrated_sytems(ids, project_id = project_id, space_id = space_id) self.delete(subscription_id, project_id = project_id, space_id = space_id) raise Exception("Subscription in error state !")
[docs] def create_subscription_using_training_data(self, subscription_name, datamart_id, service_provider_id, model_info, sample_csv, spark_credentials, data_warehouse_connection, payload_table, feedback_table, scored_training_data_table, managed_by, project_id = None, space_id = None, background_mode: bool = True): """ Create batch subscription using model_info. :param str subscription_name: Name of this subscription. :param str datamart_id: Datamart id in which we want to to create this subscription. :param str service_provider_id: Service Provider id. :param dict model_info: Information about the model which needs the subscription. Format for model_info model_info = { "model_type": "binary", "label_column": "Risk", "prediction": "predicted_label", "probability": "probability", "feature_columns": [list of categorical columns]/None, "categorical_columns": [list of categorical columns]/None, "scoring": { "url": "url":"scoring url", # This is required if explainability needs to be enabled, "token": "token for scoring", # This is mandatory for Azure ML studio Model } } :param csv sample_csv: Sample csv file :param dict spark_credentials: Dictionary containing spark connection information. Format for spark_credentials spark_credentials = { "connection": { "endpoint": <SPARK_ENDPOINT>, "location_type": <LOCATION_TYPE>, "display_name": <DISPLAY_NAME>, "instance_id": <INSTANCE_ID>, "volume": <VOLUME> }, "credentials": { "username": <USERNAME>, "apikey": <APIKEY> } } :param dict data_warehouse_connection: Dictionary containing data warehouse (DB connection) information. Format for the DB connection dataware_house_connection = { "type": "jdbc", "connection": { "jdbc_driver": "com.ibm.db2.jcc.DB2Driver", "use_ssl": False, "certificate": None, "jdbc_url": "jdbc:db2://<HOST>:50000/SAMPLE" }, "credentials": { "username": "<USERNAME>", "password": "<PASSWORD>" } } :param dict payload_table: Dictionary containing payload table connection information. Format for the payload table payload_table = { "data": { "auto_create": True, "database": "SAMPLE", "schema": "KP", "table": "PAYLOAD_TABLE" }, "parameters":{ "partition_column": "WOS_PARTITION_COLUMN", "num_partitions": 12 } } :param dict feedback_table: Dictionary containing feedback table connection information. :param dict scored_training_data_table: Dictionary scored trainin data table. Format for scored_training_data_table scored_training_data_table = { "data": { "auto_create": False, "database": "SAMPLE", "schema": "KP", "table": "SCORED_TRAINING_DATA_TABLE" }, "parameters":{ "partition_column": "WOS_PARTITION_COLUMN", "num_partitions": 12 } } :param str managed_by: To identify whether the subscription is online or batch. It should be either `system` for online subscription or `self` for batch subscripion. :param str project_id: Id of the Project context in which governance operation is happening (optional) :param str space_id: Id of the Space context in which governance operation is happening (optional) """ if (managed_by == "self"): validate_type(datamart_id, 'data_mart_id', str, True) validate_type(service_provider_id, 'service_provider_id', str, True) validate_type(model_info, 'model_info', dict, True) validate_type(spark_credentials, 'spark_credentials', dict, True) validate_type(payload_table, 'payload_table', dict, True) validate_type(feedback_table, 'feedback_table', dict, True) validate_type(scored_training_data_table, 'scored_training_data_table', dict, True) self.__validate_data_warehouse_connetion(data_warehouse_connection) provider = self._ai_client.service_providers.get(service_provider_id, project_id=project_id, space_id=space_id).result # Take feature and catogerical field from user if provided. user_feature_fields = get(model_info,"feature_columns") user_categorical_field = get(model_info,"categorical_columns") label_column = get(model_info,"label_column") prediction_field = get(model_info,"prediction") probability_field = get(model_info,"probability") transaction_id_field = get(model_info,"transaction_id_field") prediction_names = get(model_info,"prediction_names") input_token_count_field = get(model_info,"input_token_count_field") output_token_count_field = get(model_info,"output_token_count_field") feature_fields = [] categorical_fields = [] # Get feature fields if provided by user isUserFeatureFields = False if (user_feature_fields != None and len(user_feature_fields) != 0): # Validate if prediction or probability field is added in feature columns" if ((label_column in user_feature_fields) or (prediction_field in user_feature_fields) or (label_column in user_feature_fields)): raise Exception("Bad Input: Prediction, probability or label column is/are added as feature column(s).") feature_fields = user_feature_fields isUserFeatureFields = True # Get catogerical fields if provided by user isUserCategoricalFields = False if (user_categorical_field != None and len(user_categorical_field) != 0): # Validate if prediction or probability field is added in categorical columns" if ((label_column in user_categorical_field) or (prediction_field in user_categorical_field) or (label_column in user_categorical_field)): raise Exception("Bad Input: Prediction, probability or label column is/are added as categorical column(s).") # Validate if categorical fields is subset of feature fields if (isUserFeatureFields and (not(set(user_categorical_field).issubset(set(user_feature_fields))))): raise Exception("Bad Input: One or more categorical columns are not found in feature columns.") categorical_fields = user_categorical_field isUserCategoricalFields = True # Validate scoring details provided by user scoring = get(model_info,"scoring") scoring_url = None endpoint_credentials = None provider = self._ai_client.service_providers.get(service_provider_id, project_id=project_id, space_id=space_id).result if provider.entity.service_type != ServiceTypes.CUSTOM_MACHINE_LEARNING: if not scoring: raise Exception("[Error] Missing scoring details : scoring details are mandatory to create subscription") if not get(scoring,"url"): raise Exception("[Error] Missing scoring url : scoring url is mandatory to create subscription") if get(scoring,"token"): endpoint_credentials = ScoringEndpointCredentialsAzureScoringEndpointCredentials(token=get(scoring,"token")) else: if provider.entity.service_type == ServiceTypes.AZURE_MACHINE_LEARNING: raise Exception("[Error] Missing Token : Token is mandatory to create subscription for Azure ML studio model") if scoring: scoring_url = get(scoring,"url") if not scoring_url: print("[Warning] Missing scoring url : scoring url is mandatory if explain needs to be enabled") else: print("[Warning] Missing scoring details : scoring details are not provided") # Call config/datamart service API with CSV as payload to infer schema schemaUtil = SchemaUtility(self._ai_client) csv = open(sample_csv, 'rb') responseSchema = schemaUtil.spark_schemas_post(csv).result self.__validate_response_schema(responseSchema) input_fields = [] output_fields = [] training_fields = [] fields = responseSchema.fields field_names = [] for field in fields: field_names.append(field["name"]) if (field['name'] == label_column): field["metadata"]["modeling_role"] = "target" training_fields.append(field) elif (field["name"] == prediction_field): field["metadata"]["modeling_role"] = "prediction" output_fields.append(field) elif (not(field["name"] == probability_field)): field["metadata"]["modeling_role"] = "feature" # If user provided feature columns if isUserFeatureFields and field["name"] in feature_fields: input_fields.append(field) output_fields.append(field) training_fields.append(field) elif(not isUserFeatureFields): feature_fields.append(field['name']) input_fields.append(field) output_fields.append(field) training_fields.append(field) # If user provided categorical columns if (isUserCategoricalFields and field["name"] in categorical_fields): field["metadata"]["measure"] = "discrete" elif(not isUserCategoricalFields): if field["name"] in feature_fields and field['type'] == "string": categorical_fields.append(field['name']) field["metadata"]["measure"] = "discrete" if (isUserFeatureFields): if (not(set(user_feature_fields).issubset(set(field_names)))): raise Exception("Bad Input: One or more feature field provided are not present in the input csv") if (isUserCategoricalFields): if (not(set(user_categorical_field).issubset(set(field_names)))): raise Exception("Bad Input: One or more catogerical field provided are not present in the input csv") problem_type = get(model_info,"model_type") if (problem_type != ProblemType.REGRESSION): probability = { "name": probability_field, "type": { "type": "array", "elementType": "double", "containsNull": True }, "nullable": True, "metadata": { "modeling_role": "probability" } } output_fields.append(probability) spark_engine_id = None db_integrated_system_id = None try: print("Creating integrated system for Spark") spark_engine_id = self.__create_spark_integrated_system(subscription_name, spark_credentials, project_id=project_id, space_id=space_id) print("Creating integrated system for Hive/DB2") db_integrated_system_id = self.__create_db2_hive_integrated_system(subscription_name,data_warehouse_connection, project_id = project_id, space_id=space_id) # Set asset details asset = Asset( asset_id=str(uuid.uuid4()), url="", name=subscription_name, asset_type=AssetTypes.MODEL, input_data_type=InputDataType.STRUCTURED, problem_type=problem_type ) # Set deployment details asset_deployment = AssetDeploymentRequest( deployment_id=str(uuid.uuid4()), name=subscription_name, description="This is {}".format(subscription_name), deployment_type="batch", scoring_endpoint = ScoringEndpointRequest(url=scoring_url, credentials=endpoint_credentials) ) # Define input, output and training data schema input_data_schema = {"fields" : input_fields, "type" : responseSchema.type} output_data_schema = {"fields" : output_fields, "type" : responseSchema.type} training_data_scheme = {"fields" : training_fields, "type" : responseSchema.type} asset_properties_request = AssetPropertiesRequest( transaction_id_field=transaction_id_field, label_column = label_column, probability_fields = None if not probability_field else [probability_field], prediction_field = prediction_field, prediction_names = prediction_names, feature_fields = feature_fields, categorical_fields = categorical_fields, input_token_count_field = input_token_count_field, output_token_count_field = output_token_count_field ) analytics_engine = AnalyticsEngine( type = "spark", integrated_system_id = spark_engine_id, parameters = get(spark_credentials,"spark_settings",{}) ) subscription_details = self.add( data_mart_id = datamart_id, service_provider_id = service_provider_id, asset = asset, deployment = asset_deployment, asset_properties = asset_properties_request, analytics_engine = analytics_engine, project_id = project_id, space_id = space_id, background_mode = True ).result except Exception as e: # Delete all the integrated systems created for this failed subscription print("Creation of subscription failed") print(e) ids = [spark_engine_id, db_integrated_system_id] self.__delete_integrated_sytems(ids, project_id = project_id, space_id = space_id) raise e subscription_id = subscription_details.metadata.id schemas_patch_document=[ JsonPatchOperation(op=OperationTypes.REPLACE, path='/asset_properties/training_data_schema', value=training_data_scheme), JsonPatchOperation(op=OperationTypes.REPLACE, path='/asset_properties/input_data_schema', value=input_data_schema), JsonPatchOperation(op=OperationTypes.REPLACE, path='/asset_properties/output_data_schema', value=output_data_schema) ] ids = [spark_engine_id, db_integrated_system_id] message = "Updating schemas ..." self.poll_subscription_status(subscription_id, message, ids, 3, project_id = project_id, space_id = space_id) self._ai_client.subscriptions.update(subscription_id=subscription_id, patch_document=schemas_patch_document, project_id = project_id, space_id = space_id) print("Schemas update completed.") # Add selected tables as data sources data_sources = [] source_type = get(data_warehouse_connection,"type") if len(payload_table) > 0 and get(payload_table,"data.table",None) is not None: data_sources.append(self.__get_data_source(payload_table, db_integrated_system_id, "payload", source_type)) if len(feedback_table) > 0 and get(feedback_table,"data.table",None) is not None: data_sources.append(self.__get_data_source(feedback_table, db_integrated_system_id, "feedback", source_type)) if len(scored_training_data_table) > 0 and get(scored_training_data_table, "data.table",None) is not None: scored_training_data_table["data"]["state"] = "active" data_sources.append(self.__get_data_source(scored_training_data_table, db_integrated_system_id, "training", source_type)) message = "Updating data-sources ..." self.poll_subscription_status(subscription_id, message, ids, 1, project_id=project_id, space_id=space_id) # Patch datasources if len(data_sources) > 0: data_sources_patch_document=[ JsonPatchOperation(op=OperationTypes.ADD, path="/data_sources", value=data_sources) ] self._ai_client.subscriptions.update(subscription_id=subscription_id, patch_document=data_sources_patch_document, project_id = project_id, space_id = space_id) print("Data-sources update complete.") print("Subscription is created. Id is : {}".format(subscription_id)) if not background_mode: message = "Waiting for subscription to be in active state ..." self.poll_subscription_status(subscription_id, message, ids, 15, project_id=project_id, space_id=space_id) print("Subscription is now in active state.") else: print("Subscription is being activated, please wait for state to be active before using it further.") return subscription_id elif (managed_by == "system"): raise Exception("Currently not supporting online subscription") else: raise Exception("The possible values for `managed_by` are either `system` for online subscription or `self` for batch subscripion")
[docs] def create_subscription(self, subscription_name: str, datamart_id, service_provider_id, configuration_archive, spark_credentials, data_warehouse_connection, payload_table: dict = {}, feedback_table: dict = {} , model_info: dict = {}, background_mode: bool = True, project_id = None, space_id = None, **kwargs): """ Create batch subscription :param str subscription_name: Name of the subscription. :param str datamart_id: Datamart id in which we want to to create this subscription. :param str service_provider_id: Service Provider id. :param str configuration_archive: Path to the configuration archive file. :param dict spark_credentials: Dictionary containing spark connection information. Format for Remote Spark on hadoop spark_credentials = { "connection":{ "endpoint": "<SPARK_ENDPOINT>", "location_type": "custom" }, "credentials": { "username": "<USERNAME>", "password": "<PASSWORD>" }, "spark_settings": { "max_num_executors": <MAX_EXECUTOR_COUNT>, "min_num_executors": <MIN_EXECUTOR_COUNT>, "executor_cores": <EXECUTOR_CORE>, "executor_memory": <EXECUTOR_MEMORY>, "driver_cores": <NUM_OF_DRIVER_CORES>, "driver_memory": <DRIVER_MEMORY> } } Format for IAE Spark spark_credentials = { "connection": { "display_name": "<IAE_INSTANCE_DISPLAYNAME>", "endpoint": "<IAE_JOB_ENDPOINT>", "location_type": "cpd_iae", "volume": "<VOLUME_NAME>" }, "credentials": { "username": "<USERNAME>", "apikey": "<APIKEY>" }, "spark_setting": { #Look at remote spark settings } } :param dict data_warehouse_connection: Dictionary containing data warehouse (DB connection) information. Format for the DB connection dataware_house_connection = { "type": "jdbc", "connection": { "jdbc_driver": "com.ibm.db2.jcc.DB2Driver", "use_ssl": False, "certificate": None, "jdbc_url": "jdbc:db2://<HOST>:50000/SAMPLE" }, "credentials": { "username": "<USERNAME>", "password": "<PASSWORD>" } } :param dict payload_table: Dictionary containing payload table connection information. Format for the payload table payload_table = { "data": { "auto_create": True, "database": "SAMPLE", "schema": "KP", "table": "PAYLOAD_GERMAN_CREDIT_DATA_NEW" }, "parameters":{ "partition_column": "WOS_PARTITION_COLUMN", "num_partitions": 12 } } :param dict feedback_table: Dictionary containing feedback table connection information. Format for the feedback_table is same as payload table :param dict model_info: Information about the model which needs the subscription. Format for model_info model_info = { "model_type": "binary", "label_column": "Risk", "prediction": "predicted_label", "probability": "probability", "feature_columns": [list of categorical columns]/None, "categorical_columns": [list of categorical columns]/None, "scoring": { "url": "url":"scoring url", # This is required if explainability needs to be enabled, "token": "token for scoring", # This is mandatory for Azure ML studio Model } } :param str project_id: Id of the Project context in which governance operation is happening (optional) :param str space_id: Id of the Space context in which governance operation is happening (optional) :return: subscription id :rtype: str """ validate_type(subscription_name, 'subscription_name', str, True) validate_type(datamart_id, 'data_mart_id', str, True) validate_type(service_provider_id, 'service_provider_id', str, True) #validate_type(configuration_archive, 'configuration_archive', str, True) validate_type(spark_credentials, 'spark_credentials', dict, True) validate_type(data_warehouse_connection, 'data_warehouse_connection', dict, True) validate_type(payload_table, 'payload_table', dict, False) validate_type(feedback_table, 'feedback_table', dict, False) validate_type(model_info, 'model_info', dict, False) self.__validate_data_warehouse_connetion(data_warehouse_connection) self.__validate_table_info(payload_table) if feedback_table != None and len(feedback_table) > 0 : self.__validate_table_info(feedback_table) scoring_url = None endpoint_credentials = None scoring = get(model_info,"scoring") if scoring: scoring_url = get(scoring,"url") if not scoring_url: print("[Warning] Missing scoring url : There is no scoring url provided") if get(scoring,"token"): endpoint_credentials = ScoringEndpointCredentialsAzureScoringEndpointCredentials(token=get(scoring,"token")) else: provider = self._ai_client.service_providers.get(service_provider_id, project_id=project_id, space_id=space_id).result if provider.entity.service_type == ServiceTypes.AZURE_MACHINE_LEARNING: print("[Warning] Missing Token : There is no token provided for Azure ML studio model") #TODO validate parameters spark_engine_id = None db_integrated_system_id = None try: print("Creating integrated system for Spark") spark_engine_id = self.__create_spark_integrated_system(subscription_name, spark_credentials, project_id=project_id, space_id=space_id) print("Creating integrated system for Hive/DB2") db_integrated_system_id = self.__create_db2_hive_integrated_system(subscription_name,data_warehouse_connection, project_id=project_id, space_id=space_id) common_config = self.__get_common_configuration(configuration_archive) common_configuration = get(common_config,'common_configuration') model_type = get(common_configuration,"model_type") if model_type == None: model_type = get(common_configuration,"problem_type") # Set asset details asset = Asset( asset_id=str(uuid.uuid4()), url="", name=subscription_name, asset_type=AssetTypes.MODEL, input_data_type=InputDataType.STRUCTURED, problem_type=model_type ) # Set deployment details asset_deployment = AssetDeploymentRequest( deployment_id=str(uuid.uuid4()), name=subscription_name, description="This is {}".format(subscription_name), deployment_type="batch", scoring_endpoint = None if not scoring else ScoringEndpointRequest(url=scoring_url, credentials=endpoint_credentials) ) probability_field = None if model_type != ProblemType.REGRESSION: probability_field = get(common_configuration,"probability_fields") if not probability_field: probability_field = [get(common_configuration,"probability")] asset_properties_request = AssetPropertiesRequest( transaction_id_field=get(common_configuration,"transaction_id_field"), label_column=get(common_configuration,"label_column"), probability_fields=probability_field, prediction_field=get(common_configuration,"prediction"), prediction_names = get(common_configuration,"prediction_names"), feature_fields=get(common_configuration,"feature_columns"), categorical_fields=get(common_configuration,"categorical_columns"), input_token_count_field=get(common_configuration,"input_token_count_field"), output_token_count_field=get(common_configuration,"output_token_count_field") ) analytics_engine = AnalyticsEngine( type="spark", integrated_system_id=spark_engine_id, parameters = get(spark_credentials,"spark_settings",{}) ) subscription_details = self.add( data_mart_id=datamart_id, service_provider_id=service_provider_id, asset=asset, deployment=asset_deployment, asset_properties=asset_properties_request, analytics_engine=analytics_engine, project_id=project_id, space_id=space_id, background_mode = True ).result except Exception as e: #Delete all the integrated systems created for this failed subscription print("Creation of subscription failed") print(e) #ids = [spark_engine_id, payload_integrated_system_id,feedback_integrated_system_id,training_integrated_system_id] ids = [spark_engine_id, db_integrated_system_id] self.__delete_integrated_sytems(ids, project_id = project_id, space_id = space_id) raise e subscription_id = subscription_details.metadata.id schemas_patch_document=[ JsonPatchOperation(op=OperationTypes.REPLACE, path='/asset_properties/training_data_schema', value=common_configuration["training_data_schema"]), JsonPatchOperation(op=OperationTypes.REPLACE, path='/asset_properties/input_data_schema', value=common_configuration["input_data_schema"]), JsonPatchOperation(op=OperationTypes.REPLACE, path='/asset_properties/output_data_schema', value=common_configuration["output_data_schema"]) ] ids = [spark_engine_id, db_integrated_system_id] message = "Updating schemas ..." self.poll_subscription_status(subscription_id, message, ids, 3, project_id=project_id, space_id=space_id) self._ai_client.subscriptions.update(subscription_id=subscription_id, patch_document=schemas_patch_document, project_id = project_id, space_id = space_id) print("Schemas update completed.") # Add selected tables as data sources data_sources = [] source_type = get(data_warehouse_connection,"type") if len(payload_table) > 0 and get(payload_table,"data.table",None) is not None: data_sources.append(self.__get_data_source(payload_table, db_integrated_system_id, "payload", source_type)) if feedback_table != None and len(feedback_table) > 0 and get(feedback_table,"data.table",None) is not None: data_sources.append(self.__get_data_source(feedback_table, db_integrated_system_id, "feedback", source_type)) message = "Updating data-sources ..." self.poll_subscription_status(subscription_id, message, ids, 1, project_id=project_id, space_id=space_id) #patch datasources if len(data_sources) > 0: data_sources_patch_document=[ JsonPatchOperation(op=OperationTypes.ADD, path="/data_sources", value=data_sources) ] self._ai_client.subscriptions.update(subscription_id=subscription_id, patch_document=data_sources_patch_document, project_id = project_id, space_id = space_id) print("Data-sources update complete.") print("Subscription is created. Id is : {}".format(subscription_id)) if not background_mode: message = "Waiting for subscription to be in active state ..." self.poll_subscription_status(subscription_id, message, ids, 15, project_id=project_id, space_id=space_id) print("Subscription is now in active state.") else: print("Subscription is being activated, please wait for state to be active before using it further.") return subscription_id
def __create_spark_integrated_system(self, subscription_name, spark_credentials, project_id = None, space_id = None): """ Format of spark_credentials :param str project_id: Id of the Project context in which governance operation is happening (optional) :param str space_id: Id of the Space context in which governance operation is happening (optional) # Remote Spark example spark_credentials = { "connection":{ "endpoint": "http://wimps1.fyre.ibm.com:5000", "location_type": "custom" }, "credentials": { "username": "<USERNAME>", "password": "<PASSWORD>" }, "spark_settings": { "max_num_executors": 1, "min_num_executors": 1, "executor_cores": 1, "executor_memory": 2, "driver_cores": 1, "driver_memory": 2 } } # IAE Spark example spark_credentials = { "connection": { "display_name": "<IAE_DISPLAY_NAME>", "endpoint": "<IAE_JOB_ENDPOINT>", "location_type": "cpd_iae", "volume": "<VOLUME_NAME>" }, "credentials": { "username": "admin", "apikey": "<APIKEY>" }, "spark_setting": { "max_num_executors": 1, "min_num_executors": 1, "executor_cores": 1, "executor_memory": 2, "driver_cores": 1, "driver_memory": 2 } } """ spark_engine_details = IntegratedSystems(self._ai_client).add( name=subscription_name + " - Spark Engine", description = " ", type=IntegratedSystemsListEnums.Type.SPARK.value, credentials= get(spark_credentials,"credentials"), connection=get(spark_credentials,"connection"), project_id=project_id, space_id=space_id ).result spark_engine_id = spark_engine_details.metadata.id print("Integrated system {} created ".format(spark_engine_id)) return spark_engine_id def __create_db2_hive_integrated_system(self, subscription_name, table_info, project_id = None, space_id = None): #system_types = list(set([get(payload_table,"type"),get(feedback_table,"type"),get(scored_training_data_table,"type")])) #if len(system_types)==1: #All are of same type connection_type = get(table_info,"type") hive_db2_connection = get(table_info,"connection") hive_db2_credentials = get(table_info,"credentials") if hive_db2_credentials is None: hive_db2_credentials = {} if connection_type==IntegratedSystemsListEnums.Type.HIVE.value: hive_db2_connection["location_type"] = "metastore" elif connection_type==IntegratedSystemsListEnums.Type.JDBC.value: hive_db2_connection["location_type"] = IntegratedSystemsListEnums.Type.JDBC.value hive_db2_credentials["jdbc_url"] = get(table_info,"connection.jdbc_url") hive_db2_connection_details = IntegratedSystems(self._ai_client).add( name=subscription_name + " - DB Info", description = "", type=connection_type, credentials=hive_db2_credentials, connection=hive_db2_connection, project_id=project_id, space_id=space_id ).result hive_db2_connection_id=hive_db2_connection_details.metadata.id print("Hive/Db2 Integrated system {} created".format(hive_db2_connection_id)) return hive_db2_connection_id def __delete_integrated_sytems(self, ids, project_id = None, space_id = None): print("Cleaning up integration systems") for id in ids: if id is not None: print("Deleting {}".format(id)) IntegratedSystems(self._ai_client).delete(id, project_id=project_id, space_id=space_id) def __get_common_configuration(self, configuration_archive): common_config = None with tarfile.open(configuration_archive, 'r:gz') as tar: if "common_configuration.json" not in tar.getnames(): raise Exception("common_configuration.json file is missing in archive file") json_content = tar.extractfile('common_configuration.json') data = json_content.read().decode() common_config = json.loads(data) return common_config def __validate_data_warehouse_connetion(self, connection_info): validate_type(get(connection_info,"type"), 'type', str, True) type = get(connection_info,"type") if type=="jdbc": validate_type(get(connection_info,"connection.use_ssl"), 'use_ssl', bool, False) validate_type(get(connection_info,"connection.jdbc_driver"), 'jdbc_driver', str, True) validate_type(get(connection_info,"connection.jdbc_url"), 'jdbc_url', str, True) validate_type(get(connection_info,"credentials.username"), 'username', str, True) validate_type(get(connection_info,"credentials.password"), 'password', str, True) else: validate_type(get(connection_info,"connection.metastore_url"), 'metastore_url', str, True) kerberos_enabled = get(connection_info,"connection.kerberos_enabled",None) if kerberos_enabled is not None: validate_type(get(connection_info,"connection.kerberos_enabled"), 'kerberos_enabled', bool, True) if kerberos_enabled: validate_type(get(connection_info,"credentials.kerberos_principal"), 'kerberos_principal', str, True) delegation_token_urn = get(connection_info,"credentials.delegation_token_urn") delegation_token_endpoint = get(connection_info,"credentials.delegation_token_endpoint") if delegation_token_urn is None and delegation_token_endpoint is None: raise Exception("Either delegation_token_urn or delegation_token_endpoint value is required when connecting to Kerberos Hive") def __validate_table_info(self, table_info): validate_type(get(table_info,"data.auto_create"), 'auto_create', bool, True) validate_type(get(table_info,"data.database"), 'database', str, True) validate_type(get(table_info,"data.table"), 'table', str, True) def __validate_response_schema(self, schema): try: if not schema: raise Exception("spark_schema response is empty or none") if schema.type != "struct": raise Exception("The returned schema type is not struct.") if len(schema.fields) == 0: raise Exception("There are no fields in the returned schema.") except Exception as e: print("Schema validation failed.") raise e def __get_data_source(self, table_info, integrated_system_id,table_type,source_type): auto_create = get(table_info,"data.auto_create",False) state = get(table_info,"data.state","new") status = DataSourceStatus(state=state) parameters = None if "parameters" in table_info: parameters = get(table_info,"parameters") data_source = DataSource( type=table_type, database_name=get(table_info,"data.database"), schema_name=get(table_info,"data.database") if source_type==IntegratedSystemsListEnums.Type.HIVE.value else get(table_info,"data.schema"), table_name=get(table_info,"data.table"), auto_create = auto_create, status = status, connection=DataSourceConnection( type=source_type, integrated_system_id=integrated_system_id ), parameters = parameters ) return data_source.to_dict() def __create_default_monitors(self, subscription_id, data_mart_id, service_provider_id, project_id = None, space_id = None): target = Target( target_type=TargetTypes.SUBSCRIPTION, target_id=subscription_id ) provider = self._ai_client.service_providers.get(service_provider_id, project_id=project_id, space_id=space_id).result operational_space_id = provider.entity.operational_space_id if operational_space_id == constants.PRODUCTION: #Create Model Health Monittor if hasattr(self._ai_client.monitor_definitions.MONITORS, constants.MODEL_HEALTH): self.__create_monitor(data_mart_id, target, self._ai_client.monitor_definitions.MONITORS.MODEL_HEALTH.ID) # Create performance monitor for any type of subscription self.__create_monitor(data_mart_id, target, self._ai_client.monitor_definitions.MONITORS.PERFORMANCE.ID) def __create_monitor(self, data_mart_id, target, monitor_definition_id, parameters = {}): monitor_instances_info = self._ai_client.monitor_instances.create(monitor_definition_id=monitor_definition_id, target = target, parameters = parameters, data_mart_id = data_mart_id, managed_by = "user" ) monitor_instance_id = monitor_instances_info.result.metadata.id