Source code for apamax.eplapplications.smartrules

## License
# Copyright (c) 2021-2022 Software AG, Darmstadt, Germany and/or its licensors

# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed under the
# License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

import json

# Constants for smart rule types
RULE_ON_ALARM_SEND_SMS = 'onAlarmSendSms'
RULE_ON_ALARM_SEND_EMAIL = 'onAlarmSendEmail'
RULE_ON_ALARM_ESCALATE = 'onAlarmEscalateAlarm'
RULE_INCREASE_ALARM_SEVERITY = 'onAlarmDurationIncreaseAlarmSeverity'
RULE_ON_GEOFENCE_CREATE_ALARM = 'onGeofenceCreateAlarm'
RULE_ON_GEOFENCE_SEND_EMAIL = 'onGeofenceSendEmail'
RULE_CALCULATE_ENERGY = 'calculateEnergyConsumption'
RULE_MISSING_MEASUREMENTS = 'onMissingMeasurementsCreateAlarm'
RULE_ON_ALARM_EXECUTE_OPERATION = 'onAlarmExecuteOperation'
RULE_EXPLICIT_THRESHOLD = 'explicitThresholdSmartRule'
RULE_THRESHOLD = 'thresholdSmartRule'

RULE_NAMES = {
	RULE_ON_ALARM_SEND_SMS: 'When alarm is received then SMS is sent',
	RULE_ON_ALARM_SEND_EMAIL: 'When alarm is received then email is sent',
	RULE_ON_ALARM_ESCALATE: 'Escalate alarm',
	RULE_INCREASE_ALARM_SEVERITY: 'Increase alarm severity when active for too long',
	RULE_ON_GEOFENCE_CREATE_ALARM: 'On geofence create alarm',
	RULE_ON_GEOFENCE_SEND_EMAIL: 'On geofence send email',
	RULE_CALCULATE_ENERGY: 'Calculates energy consumption',
	RULE_MISSING_MEASUREMENTS: 'Creates alarm when measurements are missing',
	RULE_ON_ALARM_EXECUTE_OPERATION: 'Executes an operation when alarm is received',
	RULE_EXPLICIT_THRESHOLD: 'Create alarm when measurement reaches explicit thresholds',
	RULE_THRESHOLD: 'Creates alarms when measurement reaches thresholds',
}

[docs]class SmartRule(object): """ Class representing a smart rule object. An instance of this class must be created by calling the appropriate build method of the class :class:`SmartRulesManager`. Call the `deploy` method to deploy a smart rule to Cumulocity IoT and call the `delete` method on a previously deployed smart rule to delete it from Cumulocity IoT. Any updates to the object are not deployed on Cumulocity IoT until the `deploy` method is called. """ NAME_PREFIX = 'PYSYS_' def __init__(self, ruleType, configuration, smartRulesManager): self.ruleType = ruleType # Name of the smart rule template self.ruleName = None # Name of the smart rule - set below self.configuration = configuration or {} # Smart rule configuration self.enabledSources = None # List of device IDs for which smart rule should be enabled self.disabledSources = None # List of device IDs for which smart rule should be enabled self.enabled = True # Whether the smart rule should be enabled self.managedObjectId = None # The ID of the managed object for a local smart rule self.connection = smartRulesManager.connection # The connection object self.log = smartRulesManager.log # The logger object self._id = None # ID of the smart rule managed object. self._cepModuleId = None # Internal ID of the smart rule. self._roleScope = None # Rule is 'global' or 'local' self.setRuleName(RULE_NAMES.get(ruleType, ruleType))
[docs] def getID(self): """ Get the ID of the smart rule. Returns None if the smart rule is deleted or not yet deployed. :return: ID of the smart rule. :rtype: str """ return self._id
[docs] def getRuleName(self): """ Get the name of the smart rule. :return: The name of the smart rule. :rtype: str """ return self.ruleName
[docs] def setRuleName(self, name, addTestPrefix=True): """ Set the name of the smart rule. :param str name: The name. :param bool addTestPrefix: Add a test prefix to the name of the smart rule so that it can be identified and cleaned up as part of the test setup. :return: The updated smart rule object. :rtype: :class:`SmartRule` """ if addTestPrefix: name = SmartRule.NAME_PREFIX + name self.ruleName = name return self
[docs] def isGlobal(self): """ Check if the smart rule is global. :return: `True` if the smart rule is global, `False` otherwise. :rtype: bool """ return (self._roleScope == 'global') or (self.managedObjectId is None)
[docs] def setGlobal(self): """ Set the smart rule to be global. If a smart rule is previously set to be local to a device or group, it cannot be changed to be global. :return: The updated smart rule object. :rtype: :class:`SmartRule` """ if self._roleScope == 'local' or (self.managedObjectId is not None): raise Exception('The smart rule cannot be marked global as it is already marked local.') self.managedObjectId = None self._roleScope = 'global' return self
[docs] def isLocal(self): """ Check if the smart rule is local to a device or a group. :return: `True` if the smart rule is local to a device or a group, `False` otherwise. :rtype: bool """ return not self.isGlobal()
[docs] def setLocal(self, deviceOrGroupId): """ Set the smart rule to be local to the specified device or group. If a smart rule is previously set to be global, it cannot be changed to be local to a device or group. :param str deviceOrGroupId: The ID of the device or group. :return: The updated smart rule object. :rtype: :class:`SmartRule` """ if self._roleScope == 'global': raise Exception('The smart rule cannot be marked local as it is already marked global.') if (self.managedObjectId is not None) and (self.managedObjectId != deviceOrGroupId): raise Exception(f'The smart rule cannot be marked local as it is already marked local for device {self.managedObjectId}.') if deviceOrGroupId is None: raise Exception('The specified device or group cannot be None.') self.managedObjectId = deviceOrGroupId if isinstance(deviceOrGroupId, str) else deviceOrGroupId['id'] self._roleScope = 'local' return self
[docs] def getConfiguration(self): """ Get the configuration of the smart rule instance. :return: The configuration. :rtype: dict """ return self.configuration
[docs] def updateConfiguration(self, configuration): """ Update the configuration of the smart rule instance. :param dict[str,any] configuration: A dictionary of the updated configuration values. :return: The updated smart rule object. :rtype: :class:`SmartRule` """ self.configuration.update(configuration) return self
[docs] def getEnabledSources(self): """ Get the list of device IDs for which this smart rule is enabled. The list will be empty for a global smart rule that is enabled for all devices. :return: The list of device IDs. :rtype: list """ return self.enabledSources
[docs] def setEnabledSources(self, deviceList): """ Set the list of device IDs for which this smart rule must be enabled. If the smart rule is local to a group, all devices must be part of the group. :param str deviceList: The list of devices or device IDs. :return: The updated smart rule object. :rtype: :class:`SmartRule` """ self.enabledSources = self._toDeviceIds(deviceList) return self
[docs] def getDisabledSources(self): """ Get the list of device IDs for which this smart rule is disabled. :return: The list of device IDs. :rtype: list """ return self.disabledSources
[docs] def setDisabledSources(self, deviceList): """ Set the list of device IDs for which this smart rule must be disabled. The disabled sources list will be ignored if the enabled sources list is also set. :param list deviceList: The list of devices or device IDs. :return: The updated smart rule object. :rtype: :class:`SmartRule` """ self.disabledSources = self._toDeviceIds(deviceList) return self
[docs] def isEnabled(self): """ Check if the smart rule is enabled. :return: `True` if the smart rule is enabled, `False` otherwise. :rtype: bool """ return self.enabled
[docs] def setEnabled(self, enabled=True): """ Enable or disable the smart rule. :param bool enabled: The smart rule is enabled if `True`, otherwise it is disabled. :return: The updated smart rule object. :rtype: :class:`SmartRule` """ self.enabled = enabled return self
[docs] def deploy(self, **kwargs): """ Deploy the smart rule to Cumulocity IoT. """ if self._id is None and self._cepModuleId is None: # create smart rule self._createRule(**kwargs) else: self._updateRule(**kwargs)
[docs] def delete(self, **kwargs): """ Delete the smart rule from Cumulocity IoT. """ if self._id is None or self._cepModuleId is None: raise Exception(f'Trying to delete previously deleted or not yet deployed {self._getDesc()}') try: self.connection.request("DELETE", self._getEndPoint(), **kwargs) # Reset the IDs so that if deploy is called on this object, we create new smart rule instance. self._resetIds() self.log.debug(f'Deleted {self._getDesc()}') except Exception as err: raise Exception(f'Failed to delete {self._getDesc()} using DELETE on {self.connection.base_url}{self._getEndPoint()}: {err}')
def _getDesc(self): """ Get the description of the smart rule for logging. :return: A basic description of the smart rule. :rtype: str """ attrs = { 'type':self.ruleType } if self._id is not None: attrs['ruleId'] = self._id attrs_str = ', '.join([f'{k}={v}' for (k,v) in attrs.items()]) ruleContext = 'global' if self.isGlobal() else 'local' return f"{ruleContext} smart rule '{self.ruleName}' ({attrs_str})" def _createRule(self, **kwargs): """ Create the smart rule on Cumulocity IoT. """ self._resetIds() body = self._getRequestBody() try: self._getEndPoint() resp = self.connection.do_request_json("POST", self._getEndPoint(), body=body, useLocationHeaderPostResp=False, **kwargs) resp = json.loads(resp) self._extractAndSaveIds(resp) self.log.debug(f'Created {self._getDesc()}') except Exception as err: raise Exception(f'Failed to create {self._getDesc()} using POST on {self.connection.base_url}{self._getEndPoint()}: {err}') def _updateRule(self, **kwargs): """ Update the smart rule on Cumulocity IoT. """ body = self._getRequestBody() # c8y doesn't like if type is sent when updating a rule if 'type' in body: body.pop('type') if 'cepModuleId' in body: body.pop('cepModuleId') try: resp = self.connection.do_request_json("PUT", self._getEndPoint(), body=body, **kwargs) resp = json.loads(resp) self._extractAndSaveIds(resp) self.log.debug(f'Updated {self._getDesc()}') except Exception as err: raise Exception(f'Failed to update {self._getDesc()} using PUT on {self.connection.base_url}{self._getEndPoint()}: {err}') @staticmethod def _toDeviceIds(deviceList): """ Extract the device IDs from the list of the devices. :param deviceList: Device(s). :type deviceList: str, dict, or list :return: List of device IDs. :rtype: list[str] """ if deviceList is None: return None result = [] deviceList = deviceList or [] if isinstance(deviceList, (str, dict)): deviceList = [deviceList] for device in deviceList: # This can either be the id or a ManagedObject id = device if isinstance(device, str) else device['id'] if id: result.append(str(id)) return result def _getRequestBody(self): """ Get the body of the smart rule used for creating/updating a smart rule using the REST API. :return: a dictionary containing all the data required to create/update the smart rule using REST API. :rtype: dict """ request = { 'ruleTemplateName': self.ruleType, 'name': self.ruleName, 'config': self.configuration, 'enabled': self.enabled, } if self.isLocal(): request['c8y_Context'] = {'id':self.managedObjectId, 'context': 'device'} request['enabledSources'] = (self.getEnabledSources() or []) else: # Set either enabledSources or disabledSources. If enabledSources is set, ignore the disabledSources. if self.getEnabledSources() is not None: request['enabledSources'] = self.getEnabledSources() else: request['disabledSources'] = (self.getDisabledSources() or []) return request def _extractAndSaveIds(self, response): """ Extract the ID and cepModuleId of the smart rule and save them. These are created by Cumulocity IoT when a smart rule is successfully created. :param dict response: The response from Cumulocity IoT. """ self._id = response['id'] self._cepModuleId = response['cepModuleId'] def _resetIds(self): """ Reset the id and cepModuleId of the smart rule. """ self._id = None self._cepModuleId = None def _getEndPoint(self): """ Gets the REST endpoint associated with the smart rule. :return: The REST endpoint of the smart rule. :rtype: str """ return '/service/smartrule' + ( '/managedObjects/%s' % self.managedObjectId if self.managedObjectId is not None else '') + '/smartrules' + ('/%s' % self._id if self._id is not None else '') def _isTestSmartRule(self): """ Checks if the smart rule was created by the test framework as part of a test. :return: `True` if the smartRule was created by the framework, `False` otherwise. :rtype: bool """ return self.ruleName.startswith(SmartRule.NAME_PREFIX)
[docs]class SmartRulesManager(object): """ Class responsible for building smart rules objects that can be deployed on Cumulocity IoT. See the methods in the :class:`SmartRule` class to customize a smart rule object and deploy it. :param tenant: The Cumulocity IoT tenant. :type tenant: :class:`~apamax.eplapplications.tenant.CumulocityTenant`. :param log: The `logger` instance to use for logging. """ def __init__(self, tenant, log): self.connection = tenant.getConnection() self.log = log
[docs] def getAllSmartRules(self, withLocalRules=False): """ Get all smart rules deployed on Cumulocity IoT. :param withLocalRules: If `True`, also include smart rules local to a device or group. :return: List of smart rule objects. :rtype: list[:class:`SmartRule`] """ withLocalRules = 'true' if withLocalRules else 'false' response = self.connection.request('GET', f'/service/smartrule/smartrules?withPrivateRules={withLocalRules}') rules = json.loads(response)['rules'] return [self._parseSmartRule(rule) for rule in rules]
[docs] def build_onMeasurementExplicitThresholdCreateAlarm(self, fragment, series, rangeMin=90, rangeMax=100, alarmType='c8y_ThresholdAlarm', alarmText='Threshold exceeded'): """ Build a smart rule object for the rule "On measurement explicit threshold create alarm". To deploy it to Cumulocity IoT, call the `~SmartRule.deploy` method. :param str fragment: Measurement fragment the smart rule listens for. :param str series: Measurement series the smart rule listens for. :param float rangeMin: The minimum value of measurements for which an alarm is raised. :param float rangeMax: The maximum value of measurements for which an alarm is raised. :param str alarmType: The type of raised alarms. :param str alarmText: The alarm message. :return: A smart rule object. :rtype: :class:`SmartRule` """ config = { 'explicit': True, 'fragment': fragment, 'series': series, 'redRangeMin': rangeMin, 'redRangeMax': rangeMax, 'alarmType': alarmType, 'alarmText': alarmText, } return SmartRule(RULE_EXPLICIT_THRESHOLD, config, self)
[docs] def build_onGeofenceCreateAlarm(self, geofence, triggerAlarmOn='leaving', alarmType='c8y_GeofenceAlarm', alarmSeverity='MAJOR', alarmText='Geofence violation'): """ Build a smart rule object for the rule "On geofence create alarm". To deploy it to Cumulocity IoT, call the `~SmartRule.deploy` method. :param list[dict] geofence: The polygon that defines the boundaries of a region. It should be a list of dictionaries containing "lng" and "lat" keys. :param str triggerAlarmOn: The reason for triggering the alarm - one of "entering", "leaving", or "both". The default is "leaving". :param str alarmType: The type of raised alarms. :param str alarmSeverity: The severity of raised alarms. :param str alarmText: The alarm message. :return: A smart rule object. :rtype: :class:`SmartRule` """ #triggerAlarmOn can be 'both', 'entering' and 'leaving' config = { 'geofence': geofence, 'triggerAlarmOn': triggerAlarmOn, 'alarmType': alarmType, 'alarmSeverity': alarmSeverity, 'alarmText': alarmText, } return SmartRule(RULE_ON_GEOFENCE_CREATE_ALARM, config, self)
[docs] def build_onMissingMeasurementsCreateAlarm(self, measurementType, timeIntervalMinutes=60, alarmType='c8y_MissingMeasurementsAlarm', alarmSeverity='MAJOR', alarmText='Missing measurements of type: #{type}'): """ Build a smart rule object for the rule "On missing measurements create alarm". To deploy it to Cumulocity IoT, call the `~SmartRule.deploy` method. :param str measurementType: The type of measurement. :param float timeIntervalMinutes: Time (in minutes) to wait for missing measurements. :param str alarmType: The type of raised alarms. :param str alarmSeverity: The severity of raised alarms. :param str alarmText: The alarm message. :return: A smart rule object. :rtype: :class:`SmartRule` """ config = { 'measurementType': measurementType, 'timePeriod': timeIntervalMinutes, 'alarmType': alarmType, 'alarmSeverity': alarmSeverity, 'alarmText': alarmText, } return SmartRule(RULE_MISSING_MEASUREMENTS, config, self)
[docs] def build_onAlarmSendSMS(self, alarmTypes, smsTo, smsText='Alarm occurred'): """ Build a smart rule object for the rule "On alarm send SMS". To deploy it to Cumulocity IoT, call the `~SmartRule.deploy` method. :param alarmTypes: The types of alarms that trigger the rule. :type alarmTypes: str or list[str] :param str smsTo: The target phone number for the SMS. :param str smsText: The text of the SMS. :return: A smart rule object. :rtype: :class:`SmartRule` """ config = { 'alarmType': self._seq_to_str(alarmTypes), 'to': smsTo, 'text': smsText, } return SmartRule(RULE_ON_ALARM_SEND_SMS, config, self)
[docs] def build_onAlarmSendEmail(self, alarmTypes, sendTo, sendCC=None, sendBCC=None, replyTo=None, subject='New #{severity} alarm from #{source.name}', message='New #{severity} alarm has been received from #{source.name}. Alarm text is: "#{text}".'): """ Build a smart rule object for the rule "On alarm send email". To deploy it to Cumulocity IoT, call the `~SmartRule.deploy` method. :param alarmTypes: The types of alarms that trigger the rule. :type alarmTypes: str or list[str] :param str sendTo: The recipients of the email. :param sendCC: The recipients that are to receive a copy of the email. :type sendCC: str, optional :param sendBCC: The recipients that are to receive a blind copy of the email. :type sendBCC: str, optional :param replyTo: The reply-to address for the email. :type replyTo: str, optional :param str subject: The subject of the email. :param str message: The text of the email. :return: A smart rule object. :rtype: :class:`SmartRule` """ config = { 'alarmType': self._seq_to_str(alarmTypes), 'to': sendTo, 'cc': sendCC, 'bcc': sendBCC, 'replyTo': replyTo, 'subject': subject, 'text': message, } return SmartRule(RULE_ON_ALARM_SEND_EMAIL, config, self)
[docs] def build_onAlarmEscalateStepSendEmail(self, sendTo, sendCC=None, sendBCC=None, replyTo=None, subject='New #{severity} alarm from #{source.name}', message='New #{severity} alarm has been received from #{source.name}. Alarm text is: "#{text}".'): """ Build an escalation step to send an email for the rule "On alarm escalate it". :param str sendTo: The recipients of the email. :param sendCC: The recipients that are to receive a copy of the email. :type sendCC: str, optional :param sendBCC: The recipients that are to receive a blind copy of the email. :type sendBCC: str, optional :param replyTo: The reply-to address for the email. :type replyTo: str, optional :param str subject: The subject of the email. :param str message: The text of the email. :return: An escalation step object. """ return self.build_onAlarmSendEmail(alarmTypes='todo', sendTo=sendTo, sendCC=sendCC, sendBCC=sendBCC, replyTo=replyTo, subject=subject, message=message)
[docs] def build_onAlarmEscalateStepSendSMS(self, smsTo, smsText='New #{severity} alarm has been received from #{source.name}. Alarm text is: "#{text}".'): """ Build an escalation step to send an SMS for the rule "On alarm escalate it". :param str smsTo: The target phone number for the SMS. :param str smsText: The text of the SMS. :return: An escalation step object. """ return self.build_onAlarmSendSMS(alarmTypes='todo', smsTo=smsTo,smsText=smsText)
[docs] def build_onAlarmEscalateIt(self, alarmTypes, escalationSteps): """ Build a smart rule object for the rule "On alarm escalate it". To deploy it to Cumulocity IoT, call the `~SmartRule.deploy` method. Call the `build_onAlarmEscalateStepSendEmail` or `build_onAlarmEscalateStepSendSMS` methods to create escalation steps. :param alarmTypes: The types of alarms that trigger the rule. :type alarmTypes: str or list[str] :param list escalationSteps: Escalation steps. :return: :rtype: :class:`SmartRule` """ sub_rules = [] for step in escalationSteps: if not isinstance(step, SmartRule): raise Exception(f'Invalid escalation step provided: {step}') config = step.getConfiguration().copy() config['ruleTemplateName'] = step.ruleType config['alarmType'] = self._seq_to_str(alarmTypes) sub_rules.append(config) return SmartRule(RULE_ON_ALARM_ESCALATE, smartRulesManager=self, configuration={ 'alarmType': self._seq_to_str(alarmTypes), 'subrules': sub_rules, })
[docs] def build_onAlarmDurationIncreaseSeverity(self, alarmTypes, alarmDurationMinutes=1.0): """ Build a smart rule object for the rule "On alarm duration increase severity". To deploy it to Cumulocity IoT, call the `~SmartRule.deploy` method. :param alarmTypes: The types of alarms that trigger the rule. :type alarmTypes: str or list[str] :param float alarmDurationMinutes: The duration (in minutes) an alarm must be active before increasing the severity. :return: A smart rule object. :rtype: :class:`SmartRule` """ config = { 'alarmType': self._seq_to_str(alarmTypes), 'duration': alarmDurationMinutes * 60.0, } return SmartRule(RULE_INCREASE_ALARM_SEVERITY, config, self)
[docs] def build_onGeofenceSendEmail(self, geofence, sendTo, sendCC=None, sendBCC=None, replyTo=None, subject='New geofence violation from #{source.name}', message='New geofence violation from #{source.name}'): """ Build a smart rule object for the rule "On geofence send email". To deploy it to Cumulocity IoT, call the `~SmartRule.deploy` method. :param list[dict] geofence: The polygon that defines the boundaries of a region. It should be a list of dictionaries containing "lng" and "lat" keys. :param str sendTo: The recipients of the email. :param sendCC: The recipients that are to receive a copy of the email. :type sendCC: str, optional :param sendBCC: The recipients that are to receive a blind copy of the email. :type sendBCC: str, optional :param replyTo: The reply-to address for the email. :type replyTo: str, optional :param str subject: The subject of the email. :param str message: The text of the email. :return: A smart rule object. :rtype: :class:`SmartRule` """ config = { 'geofence': geofence, 'to': sendTo, 'cc': sendCC, 'bcc': sendBCC, 'replyTo': replyTo, 'subject': subject, 'text': message, } return SmartRule(RULE_ON_GEOFENCE_SEND_EMAIL, config, self)
[docs] def build_calculateEnergyConsumption(self, inputFragment='c8y_EnergyCounter', inputSeries='E', timeIntervalMinutes=1.0, outputFragment='c8y_EnergyConsumption', outputSeries='E'): """ Build a smart rule object for the rule "Calculate energy consumption". To deploy it to Cumulocity IoT, call the `~SmartRule.deploy` method. :param str inputFragment: The name of the fragment for incoming measurements. :param str inputSeries: The name of the series for incoming measurements. :param float timeIntervalMinutes: The interval for which to calculate consumption values. :param str outputFragment: The name of the fragment for generated measurements. :param str outputSeries: The name of the series for generated measurements. :return: A smart rule object. :rtype: :class:`SmartRule` """ config = { 'inputFragment':inputFragment, 'inputSeries':inputSeries, 'timeInterval':timeIntervalMinutes * 60.0, 'outputFragment':outputFragment, 'outputSeries':outputSeries, } return SmartRule(RULE_CALCULATE_ENERGY, config, self)
[docs] def build_onAlarmExecuteOperation(self, alarmTypes, operation): """ Build a smart rule object for the rule "On alarm execute operation". To deploy it to Cumulocity IoT, call the `~SmartRule.deploy` method. :param alarmTypes: The types of alarms that trigger the rule. :type alarmTypes: str or list[str] :param dict[str,any] operation: The operation that will be sent. It must be a dictionary representing a JSON description of the operation. :return: A smart rule object. :rtype: :class:`SmartRule` """ config = { 'alarmType': self._seq_to_str(alarmTypes), 'operation': operation, } return SmartRule(RULE_ON_ALARM_EXECUTE_OPERATION, config, self)
[docs] def build_onMeasurementThresholdCreateAlarm(self, dataPointObjectID=None, alarmType='c8y_ThresholdAlarm', alarmText='Thresholds exceeded'): """ Build a smart rule object for the rule "On measurement threshold create alarm". To deploy it to Cumulocity IoT, call the `~SmartRule.deploy` method. :param str dataPointObjectID: The ID of the Data Point Library object to use to find the values for the red and yellow ranges. :param str alarmType: The type of raised alarms. :param str alarmText: The alarm message. :return: A smart rule object. :rtype: :class:`SmartRule` """ config = { 'kpiId': dataPointObjectID, 'alarmType': alarmType, 'alarmText':alarmText, } return SmartRule(RULE_THRESHOLD, config, self)
@staticmethod def _seq_to_str(l): if isinstance(l, list): return ','.join(l) return str(l) def _parseSmartRule(self, dictToParse): """ Create a smart rule object by parsing a response from the server. :param dictToParse: Response received from the server. :return: A smart rule object. :rtype: :class:`SmartRule` """ ruleType = dictToParse['ruleTemplateName'] ruleName = dictToParse['name'] config = dictToParse['config'] enabled = dictToParse['enabled'] enabledSources = dictToParse['enabledSources'] disabledSources = dictToParse['disabledSources'] rule = SmartRule(ruleType, config, self)\ .setRuleName(ruleName, addTestPrefix=False)\ .setEnabled(enabled)\ .setEnabledSources(enabledSources)\ .setDisabledSources(disabledSources) if 'c8y_Context' in dictToParse: managedObject = dictToParse['c8y_Context']['id'] rule.setLocal(managedObject) else: rule.setGlobal() rule._extractAndSaveIds(dictToParse) return rule