#!/usr/bin/python
"""
=======
INFOS
=======
- connect 0.8.1
- python version: 3.5
- @author: Michael Kistler 2015, Livia B 2016
========
CHANGES
========
* changed / added JwT auth
* added models module
"""
from __future__ import print_function
import math
import hashlib
from datetime import datetime
from calendar import timegm
import base64
import urllib
import jwt
try:
from urllib.parse import urlparse
from urllib.parse import quote as urlparse_quote
except ImportError:
from urlparse import urlparse
from urllib import quote as urlparse_quote
import json
from pathlib import Path, PurePath, WindowsPath
import requests
from requests.auth import AuthBase
import io
import base64
import zlib
try:
import lxml.etree as ET
except:
import xml.etree.ElementTree as ET
import models as vsdModels
import logging
logger = logging.getLogger(__name__)
requests.packages.urllib3.disable_warnings()
[docs]class SAMLAuth(AuthBase):
"""Attaches SMAL to the given Request object. extends the request package auth class"""
def __init__(self, enctoken):
self.enctoken = enctoken
def __call__(self, r):
# modify and return the request
r.headers['Authorization'] = b'SAML auth=' + self.enctoken
return r
[docs]def samltoken(fp, stsurl='https://ciam-dev-chic.custodix.com/sts/services/STS'):
"""
generates the saml auth token from a credentials file
:param Path fp: file with the credentials (xml file)
:param str stsurl: url to the STS authority
:return: enctoken - the encoded token
:rtype: byte
"""
if fp.is_file():
tree = ET.ElementTree()
dom = tree.parse(str(fp))
authdata = ET.tostring(dom, encoding='utf-8')
# send the xml in the attachment to https://ciam-dev-chic.custodix.com/sts/services/STS
r = requests.post(stsurl, data=authdata, verify=False)
if r.status_code == 200:
fileobject = io.BytesIO(r.content)
tree = ET.ElementTree()
dom = tree.parse(fileobject)
saml = ET.tostring(dom, method="xml", encoding="utf-8")
# ZLIB (RFC 1950) compress the retrieved SAML token.
ztoken = zlib.compress(saml, 9)
# Base64 (RFC 4648) encode the compressed SAML token.
enctoken = base64.b64encode(ztoken)
return enctoken
else:
return None
[docs]class JWTAuth(AuthBase):
"""Attaches JMT to the given Request object. extends the request package auth class"""
def __init__(self, enctoken):
self.enctoken = enctoken
def __call__(self, r):
# modify and return the request
r.headers['Authorization'] = 'Bearer ' + self.enctoken
return r
[docs]class VSDConnecter(object):
def __init__(
self,
authtype='jwt',
url="https://demo.virtualskeleton.ch/api/",
username="demo@virtualskeleton.ch",
password="demo",
version="",
token=None,
):
self.version = version
self.url = url + version
self.s = requests.Session()
self.s.verify = False
self.authtype = authtype
self.maxAttempts = 3
self.maxAttempts401 = 2
if version:
self.version = str(version) + '/'
if authtype == 'basic':
self.username = username
self.password = password
self.s.auth = (self.username, self.password)
elif authtype == 'saml':
self.token = token
self.s.auth = SAMLAuth(self.token)
elif authtype == 'jwt':
self.username = username
self.password = password
token = self.getJWTtoken()
self.token = token.tokenValue
self.s.auth = JWTAuth(self.token)
####################
#session management
####################
def _validate_exp(self):
"""
checks if the session is still valid
:return: if validation is expired or not
:rtype: bool
:raises: DecodeError
"""
now = timegm(datetime.utcnow().utctimetuple())
if self.authtype == 'jwt':
if not hasattr(self, 'token'):
# I pass here only one time, when I request a token
self.token = None
return True
payload = jwt.decode(self.token, verify=False)
try:
exp = int(payload['exp'])
except ValueError:
raise jwt.DecodeError('Expiration Time claim (exp) must be an'
' integer.')
if exp < now:
# raise jwt.ExpiredSignatureError('Signature has expired')
return False
else:
self.s.auth = JWTAuth(self.token)
return True
else:
return True
def _stayAlive(self):
"""
checks if the token has expired, if yes, request a new token and initiates a new session
"""
if not self._validate_exp():
self.s.auth = JWTAuth(self.getJWTtoken().tokenValue)
[docs] def getJWTtoken(self):
"""
request the JWT token from the server using Basic Auth
:return: token - a authentication token or None
:rtype: Token or None
"""
token = False
try:
res = self.s.get(self.url + 'tokens/jwt', auth=(self.username, self.password), verify=False)
res.raise_for_status()
except:
logger.error(res)
raise
token = vsdModels.Token(**res.json())
try:
payload = jwt.decode(token.tokenValue, verify=False)
except jwt.InvalidTokenError as e:
logger.error('token invalid, try using Basic Auth{0}'.format(e))
raise
return token
#################################################
# requests library wrappers
################################################
def _download(self, url, fp, onlyHeader = False):
r = urlparse(url)
res = self._requestsAttempts(self.s.get, r.geturl(), params=r.params, stream=True)
try:
filename = fp.name # path object
except:
filename = fp # string
with open(filename, 'wb') as f:
for n, chunk in enumerate(res.iter_content(1024)):
f.write(chunk)
if onlyHeader and n > 2:
break
return filename
def _requestsAttempts(self, method, url, *args, **kwargs):
# generic wrapper around request library with multiple attempts
# replaces self._httpResponseCheck(self, response):
# :param method: string of the method to call "get", "put"
# :param url: full url
# :param args: args for request call
# :param kwargs: kwargs for request call
# :return: request object (raise if error after self.maxAttempts)
for i in range(self.maxAttempts):
res = method(url, *args, **kwargs)
try:
self._stayAlive()
res.raise_for_status()
return res
except:
logger.info("Connection attempt %s/%s: %s %s" % (i, self.maxAttempts, res , url))
if res.status_code == 401 and i > self.maxAttempts401:
raise
# re-raise if > max attempts
res.raise_for_status()
def _get(self, resource, *args, **kwargs): # reimplements VSDConnect.getRequest
return self._requestsAttempts(self.s.get, resource, *args, **kwargs).json()
def _put(self, resource, *args, **kwargs): # reimplements VSDConnect.putRequest
return self._requestsAttempts(self.s.put, resource, *args, **kwargs).json()
def _delete(self, resource, *args, **kwargs): # reimplements VSDConnect.postRequest
return self._requestsAttempts(self.s.delete, resource, *args, **kwargs).json()
def _post(self, resource, *args, **kwargs):
# should I avoid multiplt attempts? not idempotent, no multiple attempts
return self._requestsAttempts(self.s.post, resource, *args, **kwargs).json()
def _options(self, resource, *args, **kwargs): # reimplements VSDConnect.getRequest
return self._requestsAttempts(self.s.options, resource, *args, **kwargs).json()
#################################################
# api objects handling
################################################
[docs] def getObjectType(self, response):
"""
create an APIObject depending on the type
:param json response: object data
:return: object
:rtype: APIObject
"""
apiObject = vsdModels.APIObject(**response)
objectType = apiObject.type.name # 'RawImage'
if objectType not in dir(vsdModels):
logger.warning("Unknown type %s" % objectType)
return vsdModels.APIObject
obj = getattr(vsdModels, objectType)
return obj
[docs] def createObject(self, response=None, **kwargs):
"""
please describe the purpose here
convert fields to response dict
how to use it?, cant we use: json.dumps(self.to_struct())
:param bool response: ???
:param **kwargs: ???
"""
if response is None:
response = kwargs
objType = self.getObjectType(response)
return objType(**response)
[docs] def parseUrl(self, resource, type):
"""
check if the resource is int (ID) or the selfURL
:param str resource: url to the resource
:param str type: type of the api resource (folders, objects etc)
"""
try:
rId = int(resource)
resource = "%s/%s" % (type, rId)
except:
pass
assert type in resource
return self.fullUrl(resource)
[docs] def fullUrl(self, resource):
"""
check if resource is selfUrl or relative path. a correct full path will be returned
:param str resource: the api resource path
:return: the full resource path
:rtype: str
"""
res = urlparse(str(resource))
if res.scheme == 'https':
return resource
else:
return self.url + resource
[docs] def optionsRequest(self, resource):
"""
generic options request function
:param str resource: the api resource path
:return: json data result or None
:rtype: json or None
"""
self._stayAlive()
return self._options(resource)
#################################################
# api objects handling (READ)
################################################
[docs] def getRequest(self, resource, rpp=None, page=None, include=None):
"""
generic get request function
:param str resource: resource path
:param int rpp: results per page to show
:param int page: page nr to show, starts with 0
:param str include: option to include more informations
:return: list of objects or None
:rtype: json or None
"""
params = dict([('rpp', rpp), ('page', page), ('include', include)])
return self._get(self.fullUrl(resource), params=params)
[docs] def downloadZip(self, resource, fp):
"""
download the zipfile into the given file (fp)
:param str resource: download URL
:param Path fp: filepath
:return: None or status_code ok (200)
:rtype: int
"""
self._stayAlive()
res = self.s.get(self.fullUrl(resource), stream = True)
if res.ok:
with fp.open('wb') as f:
shutil.copyfileobj(res.raw, f)
return res.status_code
else:
return None
[docs] def downloadObject(self, obj, workingDir=None):
"""
download the object into a ZIP file based on the object name and the working directory
:param APIObject obj: object
:param Path workingDir: workpath, where to store the zip
:return: None or filename
:rtype: str
"""
fp = Path(obj.name).with_suffix('.zip')
if workingDir:
fp = Path(workingDir, fp)
return self._download(self.fullUrl(obj.downloadUrl), fp.name)
[docs] def downloadObjectPreviewImages(self, object, thumbnail=True):
field = 'thumbnailUrl' if thumbnail else 'imageUrl'
embeddedImages = list()
for i, preview in enumerate(object.objectPreviews):
p_obj = vsdModels.Preview(**self.getRequest(preview.selfUrl))
img = self._requestsAttempts(self.s.get, getattr(p_obj, field))
embeddedImages.append(base64.b64encode(img.content))
return embeddedImages
[docs] def getPaginated(self, resource):
"""
get paginated object
"""
res = self.getRequest(resource)
page = vsdModels.Pagination(**res)
return page
[docs] def getAllPaginated(self, resource, itemlist=list()):
"""
returns all items as list
:param str resource: resource path
:param list itemlist: list of items
:return: list of items
:rtype: list of Pagination objects
"""
res = self.getRequest(resource)
page = vsdModels.Pagination(**res)
for item in page.items:
itemlist.append(item)
if page.nextPageUrl:
return self.getAllPaginated(page.nextPageUrl, itemlist=itemlist)
else:
return itemlist
[docs] def iteratePageItems(self, page, func=dict):
"""
returns all items as list
:param str resource: resource path
:param func: function for converting resource
:return: iterator of items
:rtype: list of dict or model object
"""
for item in page.items:
yield func(**item)
if page.nextPageUrl:
res = self.getRequest(page.nextPageUrl)
nextPage = vsdModels.APIPagination(**res)
for nextItem in self.iteratePageItems(nextPage, func=func):
yield nextItem
[docs] def iterateAllPaginated(self, resource, func=dict):
"""
returns all items as list
:param str resource: resource path
:param func: function for converting resource
:return: iterator of items
:rtype: list of dict or model object
"""
res = self.getRequest(resource)
page = vsdModels.APIPagination(**res)
for item in self.iteratePageItems(page, func):
yield item
[docs] def getObjects(self, idList=None):
"""
please describe what you are doing
:param ??? idList: ???
:return:
:rtype:
"""
if idList is None:
idList = ''
if idList in ['', 'published', 'unpublished']:
return self.iterateAllPaginated('objects/%s' % idList, func=self.createObject)
items = []
for curId in idList:
items.append(self.getObject(curId))
return items
[docs] def getOID(self, selfURL):
"""
extracts the last part of the selfURL, tests if it is a number
:param selfURL: (str) url to the object
:return: either None if not an ID or the object ID (int)
:raises: ValueError
"""
selfURL_path = urllib.parse.urlsplit(selfURL).path
oID = Path(selfURL_path).name
try:
r = int(oID)
except ValueError as err:
print('no object ID in the selfUrl {0}. Reason: {1}'.format(selfURL, err))
r = None
return r
[docs] def getResourceTypeAndId(self, url):
"""
please describe what you are doing
:param:
:return:
:rtype:
"""
return url.rsplit('/', 2)[-2:]
def _instantiateResource(self, res):
"""
please describe what you are doing
:param:
:return:
:rtype:
"""
try:
pagination = vsdModels.Pagination(**res)
pagination.validate() #will fail if it doesno't have totalCount
return pagination
except:
resourcetype, id = self.getResourceTypeAndId(res['selfUrl'])
if resourcetype == 'objects':
return self.createObject(res)
model = vsdModels.resourceTypes[resourcetype](**res)
return model
[docs] def getResource(self, url):
"""
please describe what you are doing
:param:
:return:
:rtype:
"""
res = self.getRequest(url)
return self._instantiateResource(res)
[docs] def getObject(self, resource):
"""retrieve an object based on the objectID/selfUrl
:param int,str resource: (str) selfUrl of the object or the (int) object ID
:return: the object
:rtype: APIObject
"""
resource = self.parseUrl(resource, 'objects')
res = self.getRequest(resource)
obj = self.createObject(res)
return obj
[docs] def getFolder(self, resource):
"""retrieve an folder based on the folderID/selfUrl
:param int,str resource: (str) selfUrl of the folder or the (int) folder ID
:return: the folder
:rtype: APIFolder
"""
resource = self.parseUrl(resource, 'folders')
res = self.getRequest(resource)
folder = vsdModels.Folder(**res)
return folder
[docs] def getObjectFilesHash(self, obj):
"""
retrieve the filehash and the anonymized file hash values of a file
:param APIObject f: API object
:return: list of fileHash (uppercase)
:rtype: list of str
"""
filehash = list()
files = self.getObjectFiles(obj)
for f in files:
filehash.append(f.fileHashCode)
return filehash
[docs] def walkFolder(self, folderUrl, topdown=True):
"""
please describe here
:param:
:return:
:rtype:
"""
# similar to os.walk
folderObject = self.getFolder(folderUrl)
dirs = folderObject.childFolders
nondirs = folderObject.containedObjects
if dirs is None:
dirs = []
if nondirs is None:
nondirs = []
if topdown:
yield folderObject, dirs, nondirs
for nextDir in dirs:
for x in self.walkFolder(nextDir.selfUrl):
yield x
if not topdown:
yield folderObject, dirs, nondirs
[docs] def checkFileInObject(self, obj, fp):
"""
check if a local file is part of an object
:param APIObject obj: API object
:param Path fp: file to test
:return: if contained or not
:rtype: bool
"""
containted = False
## Haso of all files
filehash = self.getObjectFilesHash(obj)
## Local hash
BLOCKSIZE = 65536
hasher = hashlib.sha1()
with fp.open('rb') as afile:
buf = afile.read(BLOCKSIZE)
while len(buf) > 0:
hasher.update(buf)
buf = afile.read(BLOCKSIZE)
localhash = hasher.hexdigest()
if localhash.upper() in filehash:
containted = True
return containted
[docs] def searchTerm(self, resource, search, mode='default'):
""" search a resource using oAuths
:param str resouce: resource path
:param str search: term to search for
:param str mode: search for partial match (default) or exact match (exact)
:return: list of folder objects
:rtype: json
"""
search = urlparse_quote(search)
if mode == 'exact':
url = self.fullUrl(resource) + '?$filter=Term%20eq%20%27{0}%27'.format(search)
else:
url = self.fullUrl(resource) + '?$filter=startswith(Term,%27{0}%27)%20eq%20true'.format(search)
req = self.getRequest(url)
#return req.json()
[docs] def getFile(self, resource):
"""
return a APIFile object
:param str resource: resource path
:return: api file object or status code
:rtype: APIFile
"""
resource = self.parseUrl(resource, 'files')
res = self.getRequest(resource)
fObj = vsdModels.File(**res)
return fObj
[docs] def getObjectFiles(self, obj):
"""
return a list of file objects contained in an object
:param APIObject obj: object
:return: list of APIFile
:rtype: list of APIFile
"""
filelist = list()
fileurl = 'objects/{0}/files'.format(obj.id)
fl = self.iterateAllPaginated(fileurl)
#fl = self.getAllPaginated(fileurl)
for f in fl:
res = self.getFile(f['selfUrl'])
filelist.append(res)
return filelist
[docs] def fileObjectVersion(self, data):
"""
Extract VSDID and selfUrl of the related Object Version of the file after file upload
:param json data: file object data
:result: returns the id and the selfUrl of the Object Version
:rtype: str
"""
# data = json.loads(data)
f = data['file']
obj = data['relatedObject']
fSelfUrl = f['selfUrl']
return obj['selfUrl'], self.getOID(obj['selfUrl'])
[docs] def getAllUnpublishedObjects(self, resource='objects/unpublished'):
""" retrieve the unpublished objects as list of APIObject
:param str resource: resource path (eg nextPageUrl) or default groups
:param int rpp: results per page
:param int page: page to display
:return: list of objects
:rtype: APIObjects
"""
objects = list()
for item in self.iterateAllPaginated(resource, vsdModels.APIObject):
obj = self.getObject(item.selfUrl)
objects.append(obj)
return objects
[docs] def getLatestUnpublishedObject(self):
"""
searches the list of unpublished objects and returns the newest object
:return: last uploaded object
:rtype: apiObject
"""
res = self.getRequest('objects/unpublished')
if len(res['items']) > 0:
obj = self.getObject(res['items'][0].get('selfUrl'))
return obj
else:
print('you have no unpublished objects')
return None
[docs] def getFolderByName(self, search, mode='default', squeeze=True):
"""
get a list of folder(s) based on a search string
:param str search: term to search for
:param str mode: search for partial match ('default') or exact match ('exact')
:param bool squeeze: ???? what is squeeze
:return: list of folder objects APIFolders
:rtype: list of APIFolders
"""
search = urlparse_quote(search)
if mode == 'exact':
url = self.url + "folders?$filter=Name%20eq%20%27{0}%27".format(search)
else:
url = self.url + "folders?$filter=startswith(Name,%27{0}%27)%20eq%20true".format(search)
result = list(self.iterateAllPaginated(url, vsdModels.APIFolder))
if len(result) == 1 and squeeze:
folder = result[0]
print('1 folder matching the search found')
return folder
else:
print('list of {} folders matching the search found'.format(len(result)))
return result
[docs] def getContainedFolders(self, folder):
"""
return a list of folder object contained in a folder
:param APIFolder folder: folder object
:return folderlist: a list of folder object (APIFolder) contained in the folder
:rtype: list of APIFolder
"""
folderlist = list()
if folder.childFolders:
for fold in folder.childFolders:
basic = vsdModels.APIBase(**fold)
f = self.getFolder(basic.selfUrl)
folderlist.append(f)
return folderlist
else:
print('the folder does not have any contained folders')
return None
[docs] def getContainedObjects(self, folder):
"""
return a list of object contained in a folder
:param APIFolder folder: folder object
:return objlist: a list of objects (APIFObject) contained in the folder
:rtype: list of APIObject
"""
objlist = list()
if folder.containedObjects:
for obj in folder.containedObjects:
basic = vsdModels.APIBase(**obj)
o = self.getObject(basic.selfUrl)
objlist.append(o)
return objlist
else:
print('the folder does not have any contained objects')
return None
[docs] def getModalityList(self):
"""
retrieve a list of modalities objects (APIModality)
:return: list of available modalities
:rtype: list of Modality
"""
modalities = list()
items = self.iterateAllPaginated('modalities')
if items:
for item in items:
modality = vsdModels.Modality(**item)
modalities.append(modality)
return modalities
[docs] def getModality(self, resource):
""" retrieve a modalities object (APIModality)
:param int,str resource: resource path to the of the modality
:return: the modality object
:rtype: Modality
"""
resource = self.parseUrl(resource, 'modalities')
res = self.getRequest(resource)
return vsdModels.Modality(**res)
[docs] def getFolderContent(self, folder, recursive=False, mode='d'):
"""
get the objects and folder contained in the given folder. can be called recursive to travel and return all objects
:param APIFolder folder: the folder to be read
:param bool recursive: travel the folder structure recursively or not (default)
:param str mode: what to return: only objects (o), only folders (f) or default (d) folders and objects
:return content: dictionary with folders (APIBase) and object (APIBase)
:rtype: dict of APIBase
"""
objectmode = False
foldermode = False
if mode == 'o':
objectmode = True
elif mode == 'f':
foldermode = True
elif mode == 'd':
objectmode = True
foldermode = True
else:
print('mode {0} not supported'.format(mode))
folders = self.getContainedFolders(folder)
temp = dict([('folder', folder), ('object', None)])
if foldermode:
content = list([temp])
else:
content = list()
if objectmode:
objects = self.getContainedObjects(folder)
if objects is not None:
for obj in objects:
temp = dict([('folder', folder), ('object', obj)])
content.append(temp)
if folders is not None:
if recursive:
for fold in folders:
content.extend(self.getFolderContent(fold, mode=mode, recursive=True))
else:
if foldermode:
for fold in folders:
temp = dict([('folder', folder), ('object', None)])
content.append(temp)
return content
[docs] def searchOntologyTerm(self, search, oType='0', mode='default'):
"""
Search ontology term in a single ontology resource. Two modes are available to either find the exact term or based on a partial match
:param str search: string to be searched
:param int oType: ontlogy resouce code, default is FMA (0)
:param str mode: find exact term (exact) or partial match (default)
:returns: a list of ontology objects or a single ontology item
:rtype: APIOntolgy
"""
search = urlparse_quote(search)
if mode == 'exact':
url = self.url + "ontologies/{0}?$filter=Term%20eq%20%27{1}%27".format(oType, search)
else:
url = self.url + "ontologies/{0}?$filter=startswith(Term,%27{1}%27)%20eq%20true".format(oType, search)
res = self._get(url)
if res.status_code == requests.codes.ok:
result = list()
res = res.json()
if len(res['items']) == 1:
onto = vsdModels.Ontology()
onto.set(res['items'][0])
print('1 ontology term matching the search found')
return onto
for item in iter(res['items']):
onto = vsdModels.Ontology()
onto.set(item)
result.append(onto)
return result
else:
return res.status_code
[docs] def getOntologyTermByID(self, oid, oType=0):
"""
Retrieve an ontology entry based on the IRI
:param int oid: Identifier of the entry
:param int oType: Resource type, available resources can be found using the OPTIONS on /api/ontologies). Default resouce is FMA (0)
:return: ontology term entry
:rtype: json
"""
url = "ontologies/{0}/{1}".format(oType, oid)
req = self.getRequest(url)
return req.json()
[docs] def getOntologyItem(self, resource, oType=0):
"""
Retrieve an ontology item object (APIOntology)
:param int,str resource: resource path to the of the ontology item
:param int oType: ontology type
:return onto: the ontology item object
:rtype: Ontology
"""
if isinstance(resource, int):
resource = 'ontology/{0}/{1}'.format(resource, oType)
res = self.getRequest(resource)
onto = vsdModels.Ontology(**res)
return onto
[docs] def getLicenseList(self):
""" retrieve a list of the available licenses (License)
:return: list of available license objects
:rtype: list of License
"""
res = self.getRequest('licenses')
licenses = list()
if res:
for item in iter(res['items']):
lic = vsdModels.License(**item)
licenses.append(lic)
return licenses
[docs] def getLicense(self, resource):
""" retrieve a license (License)
:param int,str resource: resource path to the of the license
:return license: the license object
:rtype: License
"""
if isinstance(resource, int):
resource = 'licenses/{0}'.format(resource)
res = self.getRequest(resource)
if res:
license = vsdModels.License(**res)
return license
else:
return None
[docs] def getObjectRightList(self):
""" retrieve a list of the available base object rights (ObjectRight)
:return: list of object rights
:rtype: list of ObjectRight
"""
res = self.getRequest('object_rights')
permission = list()
if res:
for item in iter(res['items']):
perm = vsdModels.ObjectRight(**item)
permission.append(perm)
return permission
[docs] def getObjectRight(self, resource):
""" retrieve a object rights object (ObjectRight)
:param int,str resource: resource to the permission id (int) or selfurl (str)
:return: perm object
:rtype: ObjectRight
"""
if isinstance(resource, int):
resource = 'object_rights/{0}'.format(resource)
res = self.getRequest(resource)
if res:
perm = vsdModels.ObjectRight()
perm.set(obj=res)
return perm
else:
return None
[docs] def getGroups(self, resource='groups', rpp=None, page=None):
"""get the list of groups
:param str resource: resource path (eg nextPageUrl) or default groups
:param int rpp: results per page
:param int page: page number to display
:return: list of group objects
:rtype: Group
:return: pagination object
:rtype: Pagination
"""
groups = list()
res = self.getRequest(resource, rpp, page)
ppObj = vsdModels.Pagination(**res)
for g in ppObj.items:
group = vsdModels.Group(**g)
groups.append(group)
return groups, ppObj
[docs] def getGroup(self, resource):
""" retrieve a group object (Group)
:param int,str resource: path to the group id (int) or selfUrl (str)
:return: group object
:rtype: Group
"""
if isinstance(resource, int):
resource = 'groups/{0}'.format(resource)
res = self.getRequest(resource)
if res:
group = vsdModels.Group()
group.set(obj=res)
return group
else:
return None
[docs] def getUser(self, resource):
""" retrieve a user object (User)
:param int,str resource: path to the user resource id (int) or selfUrl (str)
:return: user object
:rtype: User
"""
if isinstance(resource, int):
resource = 'users/{0}'.format(resource)
res = self.getRequest(resource)
if res:
user = vsdModels.User(**res)
return user
else:
return None
[docs] def getPermissionSets(self, permset='default'):
"""
get the Object Rights for a permission set
:param str permset: name of the permission set: available are private, protect, default, collaborate, full or a list of permission ids (list)
:return perms: list of object rights objects
:rtype: OjectRight
"""
if permset == 'private':
lperms = list([1])
elif permset == 'protect':
lperms = list([2, 3])
elif permset == 'default':
lperms = list([2, 3, 4])
elif permset == 'collaborate':
lperms = list([2, 3, 4, 5])
elif permset == 'full':
lperms = list([2, 3, 4, 5, 6])
else:
lperms = permset
perms = list()
for pid in lperms:
perms.append(self.getObjectRight(pid))
return perms
[docs] def getObjectGroupRights(self, obj):
"""
get the list of attaced group rights of an object
:param APIObject obj: the object
:return rights: a list of ObjectGroupRights
:rtype: list of APIObjectGroupRight
"""
rights = None
if obj.objectGroupRights:
rights = list()
for item in obj.objectGroupRights:
res = self.getRequest(item['selfUrl'])
right = vsdModels.ObjectGroupRight()
right.set(obj=res)
rights.append(right)
return rights
[docs] def getObjectUserRights(self, obj):
"""
get the list of attaced user rights of an object
:param APIObject obj: the object
:return: a list of ObjectUserRights
:rtype: list APIObjectUserRight
"""
rights = None
if obj.objectUserRights:
rights = list()
for item in obj.objectUserRights:
res = self.getRequest(item['selfUrl'])
right = vsdModels.ObjectUserRight()
right.set(obj=res)
rights.append(right)
return rights
#################################################
# api objects handling (MODIFY)
################################################
[docs] def postRequest(self, resource, data):
"""add data to an object
:param str resource: relative path of the resource or selfUrl
:param json data: data to be added to the resource
:return: the resource object
:rtype: json
:raises: RequestException
"""
return self._post(self.fullUrl(resource), json=data)
[docs] def removeLinks(self, resource):
"""
removes all related item from an object
:param str resource: resouce path url
:return: True if successful or False if failed
:rtype: bool
"""
obj = self.getObject(resource)
if obj.linkedObjectRelations:
for link in self.iteratePageItems(obj.linkedObjectRelations, vsdModels.ObjectLink):
print(link)
self.delRequest(link.selfUrl)
else:
print('nothing to delete, no links available')
[docs] def delRequest(self, resource):
"""
generic delete request
:param str resource: resource path
:return: status_code
:rtype: int
"""
try:
req = self.s.delete(self.fullUrl(resource))
if req.status_code == requests.codes.ok:
print('resource {0} deleted, 200'.format(self.fullUrl(resource)))
return req.status_code
elif req.status_code == requests.codes.no_content:
print('resource {0} deleted, 204'.format(self.fullUrl(resource)))
return req.status_code
else:
print('resource {0} NOT (not existing or other problem) deleted'.format(self.fullUrl(resource)))
return req.status_code
except requests.exceptions.RequestException as err:
print('del request failed:', err)
return
[docs] def delObject(self, obj):
"""
delete an unvalidated object
:param APIObject obj: the object to delete
:return: status_code
:rtype: int
"""
try:
req = self.s.delete(obj.selfUrl)
if req.status_code == requests.codes.ok:
print('object {0} deleted'.format(obj.id))
return req.status_code
else:
return req.status_code
print('not deleted', req.status_code)
except requests.exceptions.RequestException as err:
print('del request failed:', err)
[docs] def chunkedread(self, fp, chunksize):
"""
breaks the file into chunks of chunksize
:param Path fp: the file to chunk
:param int chunksize: size in bytes of the chunk parts
:yields: chunk
"""
with fp.open('rb') as f:
while True:
chunk = f.read(chunksize)
if not chunk:
break
yield (chunk)
[docs] def chunkFileUpload(self, fp, chunksize=1024 * 4096):
"""
upload large files in chunks of max 100 MB size
:param Path fp: the file to upload
:param int chunksize: size in bytes of the chunk parts, default is 4MB
:return: the generated object
:rtype: APIObject
"""
parts = int(math.ceil(fp.stat().st_size / float(chunksize)))
err = False
maxchunksize = 1024 * 1024 * 100
if chunksize >= maxchunksize:
print(
'not uploaded: defined chunksize {0} is bigger than the allowed maximum {1}'.format(chunksize, maxchunksize))
return None
part = 0
for part, chunk in enumerate(self.chunkedread(fp, chunksize),1):
logger.info('({2})uploading part {0} of {1}'.format(part, parts, fp.name))
files = {'file': (str(fp.name), chunk)}
res = self._post(self.fullUrl('/chunked_upload?chunk={0}').format(part), files=files)
print('finish, uploaded part {0} of {1} '.format(part, parts))
res = self._post(self.fullUrl('chunked_upload/commit?filename={0}'.format(fp.name)))
return self.getFile(res['file']['selfUrl']), self.getObject(res['relatedObject']['selfUrl'])
# relObj = res['relatedObject']
# obj = self.getObject(relObj['selfUrl'])
# return obj
[docs] def postFolder(self, parent, name, check=True):
"""
creates the folder with a given name (name) inside a folder (parent) if not already exists
:param Folder parent: the root folder
:param str name: name of the folder which should be created
:param bool check: it we should check if already exist, default = True
:return: the folder object of the generated folder or the existing folder
:rtype: Folder
"""
folder = vsdModels.Folder()
if parent is None:
parent = self.getFolderByName('MyProjects', mode='exact')
folder.parentFolder = vsdModels.APIBase(selfUrl=parent.selfUrl)
folder.name = name
exists = False
if check:
if parent.childFolders:
for child in parent.childFolders:
fold = self.getFolder(child.selfUrl)
if fold is not None:
if fold.name == name:
print('folder {0} already exists, id: {1}'.format(name, fold.id))
exists = True
return fold
else:
print('unexpected error, folder exists but cannot be retrieved')
exists = True
# print(self.postRequest('folders', data = data))
if not exists:
data = folder.to_struct()
# for name, field in folder:
# if name not in data:
# data[name] = None
# print(data)
res = self.postRequest('folders', data=data)
folder.populate(**res)
print('folder {0} created, has id {1}'.format(name, folder.id))
assert folder.name == name
return folder
[docs] def uploadFile(self, filename):
"""
push (post) a file to the server
:param Path filename: the file to be uploaded
:return: the file object containing the related object selfUrl
:rtype: APIObject
"""
try:
data = filename.open(mode='rb').read()
##workaround for file without file extensions
if filename.suffix == '':
filename = filename.with_suffix('.dcm')
files = {'file': (str(filename.name), data)}
except:
print("opening file", filename, "failed, aborting")
return
res = self._post(self.url + 'upload', files=files)
return self.getFile(res['file']['selfUrl']), self.getObject(res['relatedObject']['selfUrl'])
#################################################
# api objects handling (UPDATE)
################################################
[docs] def putObject(self, obj):
"""update an objects information
:param APIObject obj: an APIObject
:return: the updated object
:rtype: APIObject
"""
res = self.putRequest(obj.selfUrl, data=obj.to_struct())
if res:
obj = self.createObject(res)
return obj
else:
return res
[docs] def putRequest(self, resource, data):
""" update data of an object
:param str resource: defines the relative path to the api resource
:param json data: data to be added to the object
:return: the updated object
:rtype: json
"""
try:
req = self._put(self.fullUrl(resource), json=data)
return req
except requests.exceptions.RequestException as err:
print('request failed:', err)
return None
[docs] def postRequestSimple(self, resource):
"""
post (create) a resource
:param str resource: resource path
:return: the resource object
:rtype: json
"""
req = self.s.post(self.fullUrl(resource))
return req.json()
[docs] def putRequestSimple(self, resource):
"""
put (update) a resource
:param str resource: resource path
:return: the resource object
:rtype: json
"""
req = self.s.put(self.fullUrl(resource))
return req.json()
[docs] def publishObject(self, obj):
"""
publisch an unvalidated object
:param APIObject obj: the object to publish
:return: returns the object
:rtype: APIObject
"""
try:
req = self.s.put(obj.selfUrl + '/publish')
if req.status_code == requests.codes.ok:
print('object {0} published'.format(obj.id))
return self.getObject(obj.selfUrl)
except requests.exceptions.RequestException as err:
print('publish request failed:', err)
[docs] def deleteFolderContent(self, folder):
""" delete all content from a folder (Folder)
:param Folder folder: a folder object
:return state: returns true if successful, else False
:rtype: bool
"""
state = False
folder.containedObjects = None
res = self.putRequest('folders', data=folder.to_struct())
[docs] def postObjectRights(self, obj, group, perms, isuser=False):
"""
translate a set of permissions and a group into the appropriate format and add it to the object
.. warning:: DEPRECATED: use postObjectGroupRights or postObjectUserRights!
:param APIObject obj: () the object you want to add the permissions to
:param APIGroup/APIUser group: group object or user object
:param list perms: list of Object Rights (APIObjectRight), use getPermissionSet to retrive the ObjectRights based on the permission sets
:param bool isuser: set True if the groups variable is a user. Default is False
:return: a group or user rights object
:rtype: APIObjectGroupRight,APIObjectUserRight
"""
# creat the dict of rights
rights = list()
for perm in perms:
rights.append(dict([('selfUrl', perm.selfUrl)]))
if isuser:
objRight = vsdModels.ObjectUserRight()
objRight.relatedObject = dict([('selfUrl', obj.selfUrl)])
objRight.relatedRights = rights
objRight.relatedUser = dict([('selfUrl', group.selfUrl)])
res = self.postRequest('object-user-rights', data=objRight.to_struct())
objRight.set(res)
else:
objRight = vsdModels.ObjectGroupRight()
objRight.relatedObject = dict([('selfUrl', obj.selfUrl)])
objRight.relatedRights = rights
objRight.relatedGroup = dict([('selfUrl', group.selfUrl)])
res = self.postRequest('object-group-rights', data=objRight.to_struct())
objRight.set(res)
return objRight
[docs] def postObjectUserRights(self, obj, user, perms):
""" translate a set of permissions and a user into the appropriate format and add it to the object
:param Object obj: the object you want to add the permissions to
:param User user: user object
:param list perms: list of Object Rights (APIObjectRight), use getPermissionSet to retrive the ObjectRights based on the permission sets
:return: user rights object
:rtype: ObjectUserRight
"""
# creat the dict of rights
rights = list()
for perm in perms:
rights.append(dict([('selfUrl', perm.selfUrl)]))
objRight = vsdModels.ObjectUserRight()
objRight.relatedObject = dict([('selfUrl', obj.selfUrl)])
objRight.relatedRights = rights
objRight.relatedUser = dict([('selfUrl', user.selfUrl)])
res = self.postRequest('object-user-rights', data=objRight.to_struct())
objRight.set(res)
return objRight
[docs] def postObjectGroupRights(self, obj, group, perms):
""" translate a set of permissions and a group into the appropriate format and add it to the object
:param Object obj: the object you want to add the permissions to
:param Group group: group object
:param list perms: list of Object Rights (APIObjectRight), use getPermissionSet to retrive the ObjectRights based on the permission sets
:return: group rights object
:rtype: ObjectGroupRight
"""
# creat the dict of rights
rights = list()
for perm in perms:
rights.append(dict([('selfUrl', perm.selfUrl)]))
objRight = vsdModels.ObjectGroupRight()
objRight.relatedObject = dict([('selfUrl', obj.selfUrl)])
objRight.relatedRights = rights
objRight.relatedGroup = dict([('selfUrl', group.selfUrl)])
res = self.postRequest('object-group-rights', data=objRight.to_struct())
objRight.set(res)
return objRight
[docs] def addLink(self, obj1, obj2):
""" add an object link
:param APIBase obj1: a linked object with selfUrl
:param APIBase obj2: a linked object with selfUrl
:return: the created object-link
:rtype: json
"""
link = vsdModels.ObjectLink(object1=obj1, object2=obj2)
link.validate()
return self.postRequest('object-links', data=link.to_struct())
[docs] def addOntologyToObject(self, obj, ontology, pos=0):
""" add an ontoly term to an object
:param APIBase obj: basic object
:param Ontology ontology: ontology object
:param int pos: position of the ontology term, default = 1
:return: returns true if successfully added
:rtype: bool
"""
isset = False
if isinstance(pos, int):
onto = vsdModels.ObjectOntology()
onto.position = pos
onto.object = dict([('selfUrl', obj.selfUrl)])
onto.ontologyItem = dict([('selfUrl', ontology.selfUrl)])
onto.type = ontology.type
res = self.postRequest('object-ontologies/{0}'.format(ontology.type), data=onto.to_struct())
if res:
isset = True
else:
print('position needs to be a number (int)')
return isset
[docs] def deleteFolder(self, folder, recursive=False):
"""remove a folder (Folder)
:param Folder folder: the folder object
:return: True if deleted, False if not
:rtype: bool
"""
state = False
self.deleteFolderContent(folder)
res = self.delRequest(folder.selfUrl)
if res == 200 or res == 204:
state = True
if recursive:
folders = self.getContainedFolders(folder)
for f in folders:
return self.deleteFolder(f, recursive=recursive)
return state
[docs] def createFolderStructure(self, rootfolder, filepath, parents):
"""
creates the folders based on the filepath if not already existing,
starting from the rootfolder
:param Folder rootfolder: the root folder object
:param Path filepath: filepath of the file
:param int parents: number of partent levels to create from file folder
:return: the last folder in the tree
:rtype: Folder
"""
fp = filepath.resolve()
folders = list(fp.parts)
folders.reverse()
##remove file from list
if fp.is_file():
folders.remove(folders[0])
for i in range(parents, len(folders)):
folders.remove(folders[i])
folders.reverse()
fparent = rootfolder
if fparent:
for fname in folders:
fchild = None
if fparent:
if fparent.childFolders:
for child in fparent.childFolders:
fold = self.getFolder(child.selfUrl)
if fold.name == fname:
fchild = fold
if not fchild:
f = vsdModels.Folder()
f.name = fname
f.parentFolder = vsdModels.Folder(selfUrl=fparent.selfUrl)
# f.toJson()
res = self.postRequest('folders', f.to_struct())
fparent.populate(**res)
else:
fparent = fchild
return fparent
else:
print('Root folder does not exist', rootfolder)
# jData = jFolder(folder)
return None
[docs] def addObjectToFolder(self, target, obj):
"""
add an object to the folder
:param Folder target: the target folder
:param Object obj: the object to copy
:return: updated folder
:rtype: Folder
"""
objSelfUrl = vsdModels.APIBase(**obj.to_struct())
if not objSelfUrl in target.containedObjects:
target.containedObjects.append(objSelfUrl)
res = self.putRequest('folders', data=target.to_struct())
target = vsdModels.Folder(**res)
return target
else:
return target
[docs] def removeObjectFromFolder(self, target, obj):
"""
remove an object from the folder
:param APIFolder target: the target folder
:param APIObject obj: the object to remove
:return: updated folder
:rtype: APIFolder
"""
objSelfUrl = dict([('selfUrl', obj.selfUrl)])
objects = target.containedObjects
isset = False
if objects:
if objects.count(objSelfUrl) > 0:
objects.remove(objSelfUrl)
target.containedObjects = objects
res = self.putRequest('folders', data=target.to_struct())
if not isinstance(res, int):
isset = True
else:
print('object not part of that folder')
else:
print('folder containes no objects')
return isset