# coding: utf-8
# Copyright 2020, 2024 IBM All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from io import BufferedReader
from typing import Dict, Tuple
from ibm_cloud_sdk_core import BaseService
from ibm_watson_openscale.base_classes.tables import Table
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import \
DataSets as BaseDataSets
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import (Records,
Requests)
from ibm_watson_openscale.supporting_classes.payload_record import \
PayloadRecord
from ibm_watson_openscale.utils.client_errors import IncorrectParameter
from .utils import *
if TYPE_CHECKING:
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import \
DetailedResponse
from .client import WatsonOpenScaleV2Adapter
_DEFAULT_LIST_LENGTH = 50
[docs]
class DataSets(BaseDataSets):
"""
Manages Data Sets.
"""
def __init__(self, ai_client: 'WatsonOpenScaleV2Adapter') -> None:
validate_type(ai_client, 'ai_client', BaseService, True)
self._ai_client = ai_client
super().__init__(watson_open_scale=self._ai_client)
[docs]
def show(self, limit: int = 10,
target_target_id: str = None,
target_target_type: str = None,
type: str = None,
managed_by: str = None,
project_id: str = None,
space_id: str = None,
**kwargs) -> None:
"""
Show data sets. By default 10 records will be shown.
:param limit: maximal number of fetched rows. By default set to 10. (optional)
:type limit: int
:param str target_target_id: (optional) ID of the data set target (e.g.
subscription ID, business application ID).
:param str target_target_type: (optional) type of the target.
:param str type: (optional) type of the data set.
:param str managed_by: (optional) ID of the managing entity (e.g. business
application ID).
:param dict headers: A `dict` containing the request headers
:param str project_id: Id of the Project context in which governance operation is happening (optional)
:param str space_id: Id of the Space context in which governance operation is happening (optional)
:return: None
A way you might use me is:
>>> client.data_sets.show()
>>> client.data_sets.show(limit=20)
>>> client.data_sets.show(limit=None)
"""
validate_type(limit, u'limit', int, False)
response = self.list(target_target_id=target_target_id,
target_target_type=target_target_type,
type=type,
managed_by=managed_by,
project_id=project_id,
space_id=space_id,
**kwargs)
records = [[data_set.entity.data_mart_id,
data_set.entity.status.state,
data_set.entity.target.target_id,
data_set.entity.target.target_type,
data_set.entity.type,
data_set.metadata.created_at,
data_set.metadata.id
] for data_set in response.result.data_sets]
columns = ['data_mart_id', 'status', 'target_id',
'target_type', 'type', 'created_at', 'id']
Table(columns, records).list(
limit=limit,
default_limit=_DEFAULT_LIST_LENGTH,
title="Data sets"
)
[docs]
def add(self,
data_mart_id: str,
name: str,
type: str,
target: 'Target',
data_schema: 'SparkStruct',
description: str = None,
schema_update_mode: str = None,
location: 'LocationTableName' = None,
managed_by: str = None,
project_id: str = None,
space_id: str = None,
background_mode: bool = True) -> Union['DetailedResponse', Optional[dict]]:
"""
Create Data Set.
:param str data_mart_id:
:param str name:
:param str type:
:param Target target:
:param SparkStruct data_schema:
:param str description: (optional)
:param str schema_update_mode: (optional)
:param LocationTableName location: (optional)
:param str managed_by: (optional)
:param background_mode: if set to True, run will be in asynchronous mode, if set to False
it will wait for result (optional)
:param str project_id: Id of the Project context in which governance operation is happening (optional)
:param str space_id: Id of the Space context in which governance operation is happening (optional)
:type background_mode: bool
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataSetResponse` result
A way you might use me is:
>>> from ibm_watson_openscale import *
>>> target = Target(
target_id='subscription_id',
target_type=TargetTypes.SUBSCRIPTION
)
>>> data_schema = SparkStruct(
type='struct',
fields=[
SparkStructField(
name='CheckingStatus',
type='string',
nullable=True,
metadata={'columnInfo': {'columnLength': 64},
'measure': 'discrete',
'modeling_role': 'feature'}
),
SparkStructField(
name='LoanDuration',
type='integer',
nullable=True,
metadata={'modeling_role': 'feature'}
),
SparkStructField(
name='asset_revision',
type='string',
nullable=True,
metadata={}
)
]
)
>>> custom_dataset_info = client.data_sets.add(
target=target,
name='custom dataset',
type='custom',
data_schema=data_schema,
data_mart_id='997b1474-00d2-4g05-ac02-287ebfc603b5'
)
"""
response = super().add(data_mart_id=data_mart_id, target=target, name=name, type=type,
data_schema=data_schema,
description=description, schema_update_mode=schema_update_mode, location=location,
managed_by=managed_by, project_id=project_id, space_id=space_id)
data_set_id = response.result.metadata.id
if background_mode:
return response
else:
def check_state() -> dict:
details = self.get(data_set_id=data_set_id,
project_id=project_id, space_id=space_id)
return details.result.entity.status.state
def get_result() -> Union[Tuple[str, Union[None, str], 'DetailedResponse']]:
details = self.get(data_set_id=data_set_id,
project_id=project_id, space_id=space_id)
state = details.result.entity.status.state
if state in [StatusStateType.ACTIVE]:
return "Successfully finished adding data set", None, details
else:
return "Add data set failed with status: {}".format(state), \
'Reason: {}'.format(["code: {}, message: {}".format(error.code, error.message) for error in
details.result.entity.status.failure.errors]), details
return print_synchronous_run(
'Waiting for end of adding data set {}'.format(data_set_id),
check_state,
get_result=get_result,
success_states=[StatusStateType.ACTIVE]
)
[docs]
def delete(self, data_set_id: str, background_mode: bool = True, force: bool = False,
project_id: str = None,
space_id: str = None) -> Union['DetailedResponse', Optional[dict]]:
"""
Delete Data Set.
:param str data_set_id: ID of the data set.
:param background_mode: if set to True, run will be in asynchronous mode, if set to False
it will wait for result (optional)
:param str project_id: Id of the Project context in which governance operation is happening (optional)
:param str space_id: Id of the Space context in which governance operation is happening (optional)
:type background_mode: bool
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
A way you may use me:
>>> client.data_sets.delete(data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5')
"""
response = super().delete(data_set_id=data_set_id, force=force,
project_id=project_id, space_id=project_id)
if background_mode:
return response
else:
def check_state() -> dict:
details = self.list(project_id=project_id, space_id=space_id)
if data_set_id not in str(details.result):
return StatusStateType.FINISHED
else:
return StatusStateType.ACTIVE
def get_result() -> Union[Tuple[str, Union[None, str], 'DetailedResponse']]:
details = self.list(project_id=project_id, space_id=space_id)
if data_set_id not in str(details.result):
state = StatusStateType.FINISHED
else:
state = StatusStateType.ACTIVE
if state in [StatusStateType.FINISHED]:
return "Successfully finished deleting data set", None, response
else:
# TODO: Need to show the reason.
return "Delete data set failed", 'Reason: None', response
return print_synchronous_run(
'Waiting for end of deleting data set {}'.format(data_set_id),
check_state,
get_result=get_result,
success_states=[StatusStateType.FINISHED]
)
[docs]
def store_records(self,
data_set_id: str,
request_body: Union[str, 'BufferedReader', List['PayloadRecord'], List[List], List[Dict]],
header: bool = None,
skip: int = None,
limit: int = None,
delimiter: str = None,
on_error: str = None,
csv_max_line_length: float = None,
project_id: str = None,
space_id: str = None,
background_mode: bool = True) -> Union['DetailedResponse', Optional[dict]]:
"""
Store records to specific data set.
:param str data_set_id: ID of the data set.
:param list[ScoringPayloadRequest] scoring_payload_request:
:param bool header: (optional) if not provided service will attempt to
automatically detect header in the first line.
:param int skip: (optional) skip number of rows from input.
:param int limit: (optional) limit for number of processed input rows.
:param str delimiter: (optional) delimiter character for data provided as
csv.
:param str on_error: (optional) expected behaviour on error.
:param float csv_max_line_length: (optional) maximum length of single line
in bytes (default 10MB).
:param background_mode: if set to True, run will be in asynchronous mode, if set to False
it will wait for result (optional)
:param str project_id: Id of the Project context in which governance operation is happening (optional)
:param str space_id: Id of the Space context in which governance operation is happening (optional)
:type background_mode: bool
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `Status` result
A way you might use me is:
>>> from ibm_watson_openscale import *
>>> store_record_info = client.data_sets.store_records(
request_body=[
{
"GENDER": "M",
"PRODUCT_LINE": "Golf Equipment",
"AGE": 25,
"MARITAL_STATUS": "Unspecified",
"PROFESSION": "Sales"
},
{
"GENDER": "M",
"PRODUCT_LINE": "Sport shoes",
"AGE": 28,
"MARITAL_STATUS": "Married",
"PROFESSION": "Sales"
},
{
"GENDER": "F",
"PRODUCT_LINE": "Sport shoes",
"AGE": 25,
"MARITAL_STATUS": "Single",
"PROFESSION": "Software Developer"
}
],
data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5',
)
>>> from ibm_watson_openscale.supporting_classes.payload_record import PayloadRecord
>>> store_record_info = client.data_sets.store_records(
request_body=[PayloadRecord(
scoring_id='42e62c3ae2244f0d851009dec4754d74',
request={
"fields": ["CheckingStatus", "LoanDuration", "CreditHistory", "LoanPurpose",
"LoanAmount", "ExistingSavings", "EmploymentDuration", "InstallmentPercent", "Sex",
"OthersOnLoan", "CurrentResidenceDuration", "OwnsProperty", "Age", "InstallmentPlans",
"Housing", "ExistingCreditsCount", "Job", "Dependents", "Telephone", "ForeignWorker"],
"values": [["less_0", 4, "all_credits_paid_back", "car_new", 250, "less_100", "less_1", 2,
"male", "none", 1, "real_estate", 26, "stores", "rent", 1, "unskilled", 1,
"none", "yes"]]
},
response={
"fields": ["CheckingStatus", "LoanDuration", "CreditHistory", "LoanPurpose", "LoanAmount",
"ExistingSavings", "EmploymentDuration", "InstallmentPercent", "Sex", "OthersOnLoan",
"CurrentResidenceDuration", "OwnsProperty", "Age", "InstallmentPlans", "Housing",
"ExistingCreditsCount", "Job", "Dependents", "Telephone", "ForeignWorker",
"CheckingStatus_IX", "CreditHistory_IX", "EmploymentDuration_IX", "ExistingSavings_IX",
"ForeignWorker_IX", "Housing_IX", "InstallmentPlans_IX", "Job_IX", "LoanPurpose_IX",
"OthersOnLoan_IX", "OwnsProperty_IX", "Sex_IX", "Telephone_IX", "features", "rawPrediction",
"probability", "prediction", "predictedLabel"],
"values": [["less_0", 4, "all_credits_paid_back", "car_new", 250, "less_100", "less_1", 2,
"male", "none", 1, "real_estate", 26, "stores", "rent", 1, "unskilled", 1, "none", "yes",
1.0, 3.0, 3.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0, 2.0, 0.0, 0.0, [1.0, 3.0, 0.0, 0.0, 3.0,
0.0, 0.0, 2.0, 1.0, 1.0, 1.0, 0.0, 0.0, 4.0, 250.0, 2.0, 1.0, 4.0, 26.0, 1.0, 1.0],
[19.600662549552556, 0.39933745044744245], [
0.9800331274776278, 0.01996687252237212],
0.0, "No Risk"]]
},
response_time=460,
user_id='IBM-1234'
)],
data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5',
)
>>> csv_buffer_reader = io.open('path_to_csv', mode="rb")
>>> store_record_info = client.data_sets.store_records(
request_body=csv_buffer_reader,
delimiter=',',
header=True,
limit=100,
data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5',
csv_max_line_length = 8196
)
"""
records_client = Records(watson_open_scale=self._ai_client)
if isinstance(request_body, BufferedReader):
validate_type(request_body, "request_body",
[str, BufferedReader], True)
validate_type(header, "header", bool, True)
validate_type(delimiter, "delimiter", str, True)
validate_type(csv_max_line_length,
"csv_max_line_length", int, True)
response = records_client.add(
on_error=on_error,
delimiter=delimiter,
header=header,
limit=limit,
request_body=request_body,
csv_max_line_length=csv_max_line_length,
skip=skip,
data_set_id=data_set_id,
content_type='text/csv',
project_id=project_id,
space_id=space_id)
elif isinstance(request_body, list) and request_body and isinstance(request_body[0], PayloadRecord):
response = records_client.add(
request_body=[record.to_json() for record in request_body],
limit=limit,
data_set_id=data_set_id,
content_type='application/json',
project_id=project_id,
space_id=space_id)
elif isinstance(request_body, list) and request_body and isinstance(request_body[0], dict) and \
'fields' in str(request_body) and 'values' in str(request_body):
response = records_client.add(
request_body=request_body,
limit=limit,
data_set_id=data_set_id,
content_type='application/json',
project_id=project_id,
space_id=space_id)
elif isinstance(request_body, list) and request_body and isinstance(request_body[0], dict):
response = records_client.add(
request_body=request_body,
limit=limit,
data_set_id=data_set_id,
content_type='application/json',
project_id=project_id,
space_id=space_id)
else:
raise IncorrectParameter(
'request_body',
reason=f"request_body parameter should be one of: "
f"[BufferedReader, List[PayloadRecord], List[Dict[fields, values]], List[Dict]]")
request_id = response.headers._store['location'][1].split('/')[-1]
if background_mode:
return response
else:
def check_state() -> dict:
details = self.get_update_status(
data_set_id=data_set_id, request_id=request_id)
return details.result.state
def get_result() -> Union[Tuple[str, Union[None, str], 'DetailedResponse']]:
details = self.get_update_status(
data_set_id=data_set_id, request_id=request_id)
state = details.result.state
if state in [StatusStateType.ACTIVE]:
return "Successfully finished storing records", None, details
else:
return "Store records failed with status: {}".format(state), \
'Reason: {}'.format(details.result.failure), details
return print_synchronous_run(
'Waiting for end of storing records with request id: {}'.format(
request_id),
check_state,
get_result=get_result,
success_states=[StatusStateType.ACTIVE]
)
[docs]
def get_update_status(self,
data_set_id: str,
request_id: str,
project_id: str = None,
space_id: str = None) -> 'DetailedResponse':
"""
Get status of the specific request.
:param str data_set_id: ID of the data set.
:param str request_id: ID of the request.
:param str project_id: Id of the Project context in which governance operation is happening (optional)
:param str space_id: Id of the Space context in which governance operation is happening (optional)
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `Status` result
A way you might use me is:
>>> update_status = client.data_sets.get_update_status(
data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5',
request_id='7843-00d35462-346-ac0672-7357'
)
"""
requests_client = Requests(watson_open_scale=self._ai_client)
return requests_client.get(data_set_id=data_set_id, request_id=request_id, project_id=project_id, space_id=space_id)
'''
def get_request_status(self, data_set_id: str, request_id: str, **kwargs) -> 'DetailedResponse':
"""
Get status of the specific request.
:param str data_set_id: ID of the data set.
:param str request_id: ID of the request.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `Status` result
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
if request_id is None:
raise ValueError('request_id must be provided')
headers = {}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
sdk_headers = get_sdk_headers(service_name=self._ai_client.DEFAULT_SERVICE_NAME, service_version='V2',
operation_id='data_sets_get_request_status')
headers.update(sdk_headers)
url = '/v2/data_sets/{0}/requests/{1}'.format(
*self._ai_client.encode_path_vars(data_set_id, request_id))
request = self._ai_client.prepare_request(method='GET',
url=url,
headers=headers)
response = self._ai_client.send(request)
print(response.result)
response.result = Status.from_dict(response.result)
return response
'''
[docs]
def get_list_of_records(self,
data_set_id: str,
limit: int = 100,
start: 'datetime' = None,
end: 'datetime' = None,
offset: int = None,
includes: str = None,
annotations: List[str] = None,
exclude_annotations: bool = None,
filter: str = None,
include_total_count: bool = None,
order: str = None,
format: str = None,
output_type: str = None,
include_internal_columns=False,
return_payload_raw_data=False,
project_id: str = None,
space_id: str = None
) -> Union['DetailedResponse', 'DataFrame']:
"""
List data set records.
:param str data_set_id: ID of the data set.
:param int limit: By default it will return 100 records. If the
value is greater than 1000 than it will be truncated.
:param datetime start: (optional) return records with timestamp greater
than or equal to `start` parameter. Date string should be in the UTC ISO 8601 format. Ex: 2021-06-10T09:43:53.309Z
:param datetime end: (optional) return records with timestamp lower than
`end` parameter. Date string should be in the UTC ISO 8601 format. Ex: 2021-06-10T09:43:53.309Z
:param int offset: (optional) offset of returned records.
:param str includes: (optional) return records with only specified columns.
Parameter must be specified as comma separated string.
:param list[str] annotations: (optional) return record annotations with
given names.
:param bool exclude_annotations: (optional) If there is no need to fetch
annotations at all, set this parameter as true. There should be better
performance.
:param str filter: (optional) return records for which transaction ids in
associated data set the condition is met, format:
{data_set_id}.{field_name}:{op}:{value}.
:param bool include_total_count: (optional) If total_count should be
included. It can have performance impact if total_count is calculated.
:param str order: (optional) return records in order specified. There are
two patterns. The first is random sampling, the other is sorting per
column.
:param str filter: (optional) return records for which transaction ids in
associated data set the condition is met, format:
{data_set_id}.{field_name}:{op}:{value}.
:param str format: (optional) What JSON format to use on output.
:param str output_type: (optional) type of the response data to be present, default is 'dict',
available option is 'pandas'
:param bool include_internal_columns: (optional) Flag to retrieve internal columns
:param bool return_payload_raw_data: (optional) Flag to retrieve only the raw data which was used for scoring.
Applicable to only Payload Logging data and format is 'pandas'
:param str project_id: Id of the Project context in which governance operation is happening (optional)
:param str space_id: Id of the Space context in which governance operation is happening (optional)
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataRecordsResponseCollection` result
A way you might use me is:
>>> records = client.data_sets.get_list_of_records(data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5')
>>> records = client.data_sets.get_list_of_records(data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5',
format=RecordsFormatTypes.DICT
)
>>> records = client.data_sets.get_list_of_records(data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5',
format=RecordsFormatTypes.LIST
)
>>> records = client.data_sets.get_list_of_records(data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5',
output_type=ResponseTypes.PYTHON
)
>>> records = client.data_sets.get_list_of_records(data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5',
output_type=ResponseTypes.PANDAS
)
"""
records_client = Records(watson_open_scale=self._ai_client)
response = records_client.list(data_set_id=data_set_id, start=start, end=end,
limit=limit, offset=offset, includes=includes,
annotations=annotations, exclude_annotations=exclude_annotations,
filter=filter, include_total_count=include_total_count,
order=order, format=format, project_id=project_id, space_id=space_id)
if output_type is not None and ResponseTypes.PANDAS == output_type:
from pandas import DataFrame
# note: issue: planning-sdk-squad #1367
# fields = list(response.result.records[0].entity.values.keys())
# values = [list(record.entity.values.values()) for record in response.result.records]
if len(response.result['records']) > 0:
if format == RecordsFormatTypes.DICT or format is None:
fields = list(
response.result['records'][0]['entity']['values'].keys())
if include_internal_columns:
values = [[value for value, field in zip(list(record['entity']['values'].values(
)), fields)] for record in response.result['records']]
else:
values = [[value for value, field in zip(list(record['entity']['values'].values()), fields) if
not field.startswith('_')] for record in response.result['records']]
else:
fields = list(response.result['records'][0]['fields'])
if include_internal_columns:
values = [
[value for value, field in zip(list(record['values']), fields)] for record in response.result['records']]
else:
values = [
[value for value, field in zip(list(record['values']), fields) if not field.startswith('_')] for
record in response.result['records']]
if include_internal_columns:
fields = [field for field in fields]
else:
fields = [
field for field in fields if not field.startswith('_')]
else:
fields = []
values = []
df = DataFrame.from_records(values, columns=fields)
if return_payload_raw_data and len(fields) > 0:
all_fields = self.get(data_set_id=data_set_id, project_id=project_id,
space_id=space_id).result.entity.data_schema.fields
input_features = []
for field in all_fields:
if ('modeling_role' in field['metadata']) and (field['metadata']['modeling_role'] == 'feature'):
input_features.append(field['name'])
df = df[input_features]
response.result = df
return response
[docs]
def show_records(self,
data_set_id: str = None,
limit: int = 10,
project_id=None,
space_id=None) -> None:
"""
Show data set records. By default 10 records will be shown.
:param str data_set_id: ID of the data set.
:param limit: Maximal number of fetched rows. By default set to 10. (optional)
:param str project_id: Id of the Project context in which governance operation is happening (optional)
:param str space_id: Id of the Space context in which governance operation is happening (optional)
:type limit: int
:return: None
A way you might use me is:
>>> client.data_sets.show_records(data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5')
"""
records: 'DataFrame' = self.get_list_of_records(
data_set_id=data_set_id,
limit=limit,
output_type='pandas',
project_id=project_id,
space_id=space_id).result
rows = records.values.tolist()
col_names = records.columns.tolist()
Table(col_names, rows).list(
limit=limit,
default_limit=_DEFAULT_LIST_LENGTH,
title="Data Set {} Records".format(
data_set_id
)
)
[docs]
def print_records_schema(self, data_set_id: str = None,
project_id: str = None,
space_id: str = None) -> None:
"""
Show data set records. By default 10 records will be shown.
:param str data_set_id: ID of the data set.
:param str project_id: Id of the Project context in which governance operation is happening (optional)
:param str space_id: Id of the Space context in which governance operation is happening (optional)
:return: None
A way you might use me is:
>>> client.data_sets.print_records_schema(data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5')
"""
schema = self.get(data_set_id=data_set_id, project_id=project_id,
space_id=space_id).result.entity.data_schema
schema = schema.to_dict()
schema_records = [[field['name'],
field['type'],
field['nullable']
] for field in schema['fields']]
Table(['name', 'type', 'nullable'], schema_records).list(
title='Schema of {} data set'.format(data_set_id))
[docs]
def get_records_count(self,
data_set_id: str = None,
project_id=None,
space_id=None) -> int:
"""
Count data set records.
:param data_set_id: ID of the data set.
:param str project_id: Id of the Project context in which governance operation is happening (optional)
:param str space_id: Id of the Space context in which governance operation is happening (optional)
:return: int
A way you might use me is:
>>> number_of_records = client.data_sets.get_records_count(data_set_id='997b1474-00d2-4g05-ac02-287ebfc603b5')
"""
response = self.get_list_of_records(
data_set_id=data_set_id, include_total_count=True, limit=0, project_id=project_id, space_id=space_id)
# note: issue: planning-sdk-squad #1367
# return response.result.total_count
return response.result.get('total_count')
[docs]
def patch_records(self,
data_set_id: str,
patch_document: List['PatchDocument'],
*,
project_id: str = None,
space_id: str = None,
**kwargs):
"""
Update data set records.
:param str data_set_id: ID of the data set.
:param List[PatchDocument] patch_document: The list of documents to patch
:param str project_id: (optional) The GUID of the project.
:param str space_id: (optional) The GUID of the space.
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `Status` result
"""
records_client = Records(watson_open_scale=self._ai_client)
response = records_client.patch(
patch_document=patch_document,
data_set_id=data_set_id,
project_id=project_id,
space_id=space_id,
**kwargs)
return response