Source code for apamax.eplapplications.eplapps
## License
# Copyright (c) 2020 Software AG, Darmstadt, Germany and/or its licensors
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed under the
# License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import json
import os
import urllib
import codecs
from pathlib import Path
[docs]class EPLApps:
"""Class for interacting with Apama EPL Apps in Cumulocity IoT.
:param connection: A C8yConnection object for the connection to the platform.
"""
def __init__(self, connection):
self.connection = connection
[docs] def deploy(self, file, name='', description=None, inactive=False, redeploy=False):
"""
Deploys a local mon file to Apama EPL Apps in Cumulocity IoT.
:param file: Path to local mon file to be deployed as an EPL app.
:param name: Name of the EPL app to be uploaded (optional). By default this will be the name of the mon file being uploaded.
:param description: Description of the EPL app (optional).
:param inactive: Boolean of whether the app should be 'active' (inactive=False) or 'inactive' (inactive=True) when it is deployed.
:param redeploy: Boolean of whether we are overwriting an existing EPL app.
"""
active = not inactive
# Check EPL file specified is valid .mon file:
if not os.path.exists(file):
raise FileNotFoundError(f'Deploy failed. File \'{file}\' not found.')
elif os.path.splitext(file)[1] != '.mon':
raise TypeError(f'Deploy failed. \'{file}\' is not a valid .mon file.')
# Check whether EPL app of that name already exists for tenant:
try:
existingEPLApps = self.getEPLApps()
except Exception as err:
raise OSError(f'Could not deploy EPL app. {err}')
existingAppNames = [app['name'] for app in existingEPLApps]
# If name option not specified, use name of the .mon file specified by default
if name == '':
# Removing file path (directories) up to name of file:
filename = os.path.basename(file)
name = filename[:filename.rfind('.mon')]
if name in existingAppNames:
if redeploy:
try:
updateArgs = {'file': file}
if description is not None: updateArgs['description'] = description
updateArgs['state'] = 'active' if active else 'inactive'
self.update(name, **updateArgs)
return
except Exception as err:
raise OSError(f'Unable to redeploy EPL app \'{name}\'. {err}')
else:
raise FileExistsError(f'Deploy failed. \'{name}\' already exists in Apama EPL Apps.')
try:
file_contents = self.__read_text_withBOM(file)
except Exception as err:
raise IOError(f"Deploy failed. {err}")
try:
body = {
'name': name,
'description': description or '',
'state': 'active' if active else 'inactive',
'contents': file_contents
}
responseBytes = self.connection.do_request_json('POST', '/service/cep/eplfiles', body, useLocationHeaderPostResp=False)
response = json.loads(responseBytes)
if active and len(response['errors']) > 0:
self.delete(name)
errorStrings = []
for error in response['errors']:
errorStrings.append(f"[{os.path.basename(file)}:{error['line']}] {error['text']}")
raise ValueError('\n'.join(errorStrings))
except Exception as err:
raise OSError(f'Unable to deploy EPL app \'{name}\' using POST on {self.connection.base_url}/service/cep/eplfiles.\n{err}')
[docs] def update(self, name, new_name=None, file=None, description=None, state=None):
"""
Updates an EPL app in Cumulocity IoT.
:param name: name of the EPL App to be updated.
:param new_name: the updated name of the EPL app (optional)
:param file: path to the local mon file containing the updated contents of the EPL app (optional)
:param description: the updated description of the EPL app (optional)
:param state: the updated state of the EPL app (optional)
"""
if new_name is None and file is None and description is None and state is None:
raise ValueError(f"Update failed. Please specify at least 1 field to update.")
try:
appId = self.getAppId(name)
except FileNotFoundError as err:
raise FileNotFoundError(f'Update failed. {err}')
except Exception as err:
raise OSError(f'Update failed. {err}')
body = {}
if new_name is not None:
body['name'] = new_name
if description is not None:
body['description'] = description
if state is not None:
if state.lower() in ('active', 'inactive'):
body['state'] = state.lower()
else:
raise ValueError(f'Update failed. Invalid argument, \'{state}\', specified for the --state option. State can either be \'active\' or \'inactive\'.')
if file is not None:
# Check file is valid:
if not os.path.exists(file):
raise FileNotFoundError(f'Update failed. File \'{file}\' not found.')
elif os.path.splitext(file)[1] != '.mon':
raise TypeError(f'Update failed. \'{file}\' is not a valid .mon file.')
try:
contents = self.__read_text_withBOM(file)
except Exception as err:
raise IOError(f"Update failed. {err}")
body['contents'] = contents
try:
responseBytes = self.connection.do_request_json('PUT', f'/service/cep/eplfiles/{appId}', body)
response = json.loads(responseBytes)
if len(response['errors']) > 0:
errorStrings = []
for error in response['errors']:
errorStrings.append(f"[{response['name']}:{error['line']}] {error['text']}")
raise ValueError('\n'.join(errorStrings))
except Exception as err:
raise ConnectionError(f'Unable to update EPL app \'{name}\' using PUT on {self.connection.base_url}/service/cep/eplfiles/{appId}.\n{err}')
[docs] def getAppId(self, appName: str, jsonEPLAppsList=None):
"""
Gets the id of EPL app for a given EPL app name. If no EPL app with appname exists, an exception is raised.
:param appName: The name of the EPL app we wish to get the id of
:param jsonEPLAppsList: A json collection of EPL apps
:return: The id of the EPL app
"""
jsonEPLAppsList = jsonEPLAppsList or self.getEPLApps()
for app in jsonEPLAppsList:
if app['name'] == appName:
return app['id']
raise FileNotFoundError(f'EPL app \'{appName}\' not found.')
[docs] def getEPLApps(self, includeContents=False):
"""
:param includeContents: Fetches the EPL files with their contents if True. This is an optional query parameter.
:return: A json object of all the user's EPL apps in Cumulocity IoT.
"""
try:
return self.connection.do_get(f'/service/cep/eplfiles?contents={includeContents}')['eplfiles']
except Exception as err:
raise OSError(f'GET on {self.connection.base_url}/service/cep/eplfiles failed. {err}')
[docs] def delete(self, name: str):
"""
Deletes an EPL app in Cumulocity IoT.
:param name: The name of the EPL app to be deleted.
"""
try:
appId = self.getAppId(name)
except FileNotFoundError as err:
raise FileNotFoundError(f'Delete failed. {err}')
except Exception as err:
raise OSError(f'Delete failed. GET on {self.connection.base_url}/service/cep/eplfiles failed. {err}')
try:
self.connection.request('DELETE', f'/service/cep/eplfiles/{appId}')
except Exception as err:
raise OSError(f'Unable to delete EPL app \'{name}\' using DELETE on {self.connection.base_url}/service/cep/eplfiles. {err}')
def __read_text_withBOM(self, path):
"""
Thin wrapper for Path(<path>).read_text() . It assumes the file is UTF-8 encoded if it starts with the UTF-8 BOM, despite the current locale.
This method is used internally to make the tool behave consistently with many text editors and IDEs on Windows, which also honour the UTF-8 BOM.
Such a file is rendered correctly, therefore it should also be deployed correctly, else user expectations are confounded.
:param path: The path to extract the text from
"""
if Path(path).read_bytes().startswith(codecs.BOM_UTF8):
return Path(path).read_text(encoding="utf8")
else:
return Path(path).read_text()