Source code for connect


- connect 0.8.1
- python version: 3.5
- @author: Michael Kistler 2015, Livia B 2016


* changed / added JwT auth
* added models module 


from __future__ import print_function

import math
import hashlib

from datetime import datetime
from calendar import timegm
import base64

import urllib
import jwt

    from urllib.parse import urlparse
    from urllib.parse import quote as urlparse_quote
except ImportError:
    from urlparse import urlparse
    from urllib import quote as urlparse_quote

import json

from pathlib import Path, PurePath, WindowsPath
import requests
from requests.auth import AuthBase

import io
import base64
import zlib

    import lxml.etree as ET
    import xml.etree.ElementTree as ET

import models as vsdModels
import logging

logger = logging.getLogger(__name__)


[docs]class SAMLAuth(AuthBase): """Attaches SMAL to the given Request object. extends the request package auth class""" def __init__(self, enctoken): self.enctoken = enctoken def __call__(self, r): # modify and return the request r.headers['Authorization'] = b'SAML auth=' + self.enctoken return r
[docs]def samltoken(fp, stsurl=''): """ generates the saml auth token from a credentials file :param Path fp: file with the credentials (xml file) :param str stsurl: url to the STS authority :return: enctoken - the encoded token :rtype: byte """ if fp.is_file(): tree = ET.ElementTree() dom = tree.parse(str(fp)) authdata = ET.tostring(dom, encoding='utf-8') # send the xml in the attachment to r =, data=authdata, verify=False) if r.status_code == 200: fileobject = io.BytesIO(r.content) tree = ET.ElementTree() dom = tree.parse(fileobject) saml = ET.tostring(dom, method="xml", encoding="utf-8") # ZLIB (RFC 1950) compress the retrieved SAML token. ztoken = zlib.compress(saml, 9) # Base64 (RFC 4648) encode the compressed SAML token. enctoken = base64.b64encode(ztoken) return enctoken else: return None
[docs]class JWTAuth(AuthBase): """Attaches JMT to the given Request object. extends the request package auth class""" def __init__(self, enctoken): self.enctoken = enctoken def __call__(self, r): # modify and return the request r.headers['Authorization'] = 'Bearer ' + self.enctoken return r
[docs]class VSDConnecter(object): def __init__( self, authtype='jwt', url="", username="", password="demo", version="", token=None, ): self.version = version self.url = url + version self.s = requests.Session() self.s.verify = False self.authtype = authtype self.maxAttempts = 3 self.maxAttempts401 = 2 if version: self.version = str(version) + '/' if authtype == 'basic': self.username = username self.password = password self.s.auth = (self.username, self.password) elif authtype == 'saml': self.token = token self.s.auth = SAMLAuth(self.token) elif authtype == 'jwt': self.username = username self.password = password token = self.getJWTtoken() self.token = token.tokenValue self.s.auth = JWTAuth(self.token) #################### #session management #################### def _validate_exp(self): """ checks if the session is still valid :return: if validation is expired or not :rtype: bool :raises: DecodeError """ now = timegm(datetime.utcnow().utctimetuple()) if self.authtype == 'jwt': if not hasattr(self, 'token'): # I pass here only one time, when I request a token self.token = None return True payload = jwt.decode(self.token, verify=False) try: exp = int(payload['exp']) except ValueError: raise jwt.DecodeError('Expiration Time claim (exp) must be an' ' integer.') if exp < now: # raise jwt.ExpiredSignatureError('Signature has expired') return False else: self.s.auth = JWTAuth(self.token) return True else: return True def _stayAlive(self): """ checks if the token has expired, if yes, request a new token and initiates a new session """ if not self._validate_exp(): self.s.auth = JWTAuth(self.getJWTtoken().tokenValue)
[docs] def getJWTtoken(self): """ request the JWT token from the server using Basic Auth :return: token - a authentication token or None :rtype: Token or None """ token = False try: res = self.s.get(self.url + 'tokens/jwt', auth=(self.username, self.password), verify=False) res.raise_for_status() except: logger.error(res) raise token = vsdModels.Token(**res.json()) try: payload = jwt.decode(token.tokenValue, verify=False) except jwt.InvalidTokenError as e: logger.error('token invalid, try using Basic Auth{0}'.format(e)) raise return token
################################################# # requests library wrappers ################################################ def _download(self, url, fp, onlyHeader = False): r = urlparse(url) res = self._requestsAttempts(self.s.get, r.geturl(), params=r.params, stream=True) try: filename = # path object except: filename = fp # string with open(filename, 'wb') as f: for n, chunk in enumerate(res.iter_content(1024)): f.write(chunk) if onlyHeader and n > 2: break return filename def _requestsAttempts(self, method, url, *args, **kwargs): # generic wrapper around request library with multiple attempts # replaces self._httpResponseCheck(self, response): # :param method: string of the method to call "get", "put" # :param url: full url # :param args: args for request call # :param kwargs: kwargs for request call # :return: request object (raise if error after self.maxAttempts) for i in range(self.maxAttempts): res = method(url, *args, **kwargs) try: self._stayAlive() res.raise_for_status() return res except:"Connection attempt %s/%s: %s %s" % (i, self.maxAttempts, res , url)) if res.status_code == 401 and i > self.maxAttempts401: raise # re-raise if > max attempts res.raise_for_status() def _get(self, resource, *args, **kwargs): # reimplements VSDConnect.getRequest return self._requestsAttempts(self.s.get, resource, *args, **kwargs).json() def _put(self, resource, *args, **kwargs): # reimplements VSDConnect.putRequest return self._requestsAttempts(self.s.put, resource, *args, **kwargs).json() def _delete(self, resource, *args, **kwargs): # reimplements VSDConnect.postRequest return self._requestsAttempts(self.s.delete, resource, *args, **kwargs).json() def _post(self, resource, *args, **kwargs): # should I avoid multiplt attempts? not idempotent, no multiple attempts return self._requestsAttempts(, resource, *args, **kwargs).json() def _options(self, resource, *args, **kwargs): # reimplements VSDConnect.getRequest return self._requestsAttempts(self.s.options, resource, *args, **kwargs).json() ################################################# # api objects handling ################################################
[docs] def getObjectType(self, response): """ create an APIObject depending on the type :param json response: object data :return: object :rtype: APIObject """ apiObject = vsdModels.APIObject(**response) objectType = # 'RawImage' if objectType not in dir(vsdModels): logger.warning("Unknown type %s" % objectType) return vsdModels.APIObject obj = getattr(vsdModels, objectType) return obj
[docs] def createObject(self, response=None, **kwargs): """ please describe the purpose here convert fields to response dict how to use it?, cant we use: json.dumps(self.to_struct()) :param bool response: ??? :param **kwargs: ??? """ if response is None: response = kwargs objType = self.getObjectType(response) return objType(**response)
[docs] def parseUrl(self, resource, type): """ check if the resource is int (ID) or the selfURL :param str resource: url to the resource :param str type: type of the api resource (folders, objects etc) """ try: rId = int(resource) resource = "%s/%s" % (type, rId) except: pass assert type in resource return self.fullUrl(resource)
[docs] def fullUrl(self, resource): """ check if resource is selfUrl or relative path. a correct full path will be returned :param str resource: the api resource path :return: the full resource path :rtype: str """ res = urlparse(str(resource)) if res.scheme == 'https': return resource else: return self.url + resource
[docs] def optionsRequest(self, resource): """ generic options request function :param str resource: the api resource path :return: json data result or None :rtype: json or None """ self._stayAlive() return self._options(resource)
################################################# # api objects handling (READ) ################################################
[docs] def getRequest(self, resource, rpp=None, page=None, include=None): """ generic get request function :param str resource: resource path :param int rpp: results per page to show :param int page: page nr to show, starts with 0 :param str include: option to include more informations :return: list of objects or None :rtype: json or None """ params = dict([('rpp', rpp), ('page', page), ('include', include)]) return self._get(self.fullUrl(resource), params=params)
[docs] def downloadZip(self, resource, fp): """ download the zipfile into the given file (fp) :param str resource: download URL :param Path fp: filepath :return: None or status_code ok (200) :rtype: int """ self._stayAlive() res = self.s.get(self.fullUrl(resource), stream = True) if res.ok: with'wb') as f: shutil.copyfileobj(res.raw, f) return res.status_code else: return None
[docs] def downloadObject(self, obj, workingDir=None): """ download the object into a ZIP file based on the object name and the working directory :param APIObject obj: object :param Path workingDir: workpath, where to store the zip :return: None or filename :rtype: str """ fp = Path('.zip') if workingDir: fp = Path(workingDir, fp) return self._download(self.fullUrl(obj.downloadUrl),
[docs] def downloadObjectPreviewImages(self, object, thumbnail=True): field = 'thumbnailUrl' if thumbnail else 'imageUrl' embeddedImages = list() for i, preview in enumerate(object.objectPreviews): p_obj = vsdModels.Preview(**self.getRequest(preview.selfUrl)) img = self._requestsAttempts(self.s.get, getattr(p_obj, field)) embeddedImages.append(base64.b64encode(img.content)) return embeddedImages
[docs] def getPaginated(self, resource): """ get paginated object """ res = self.getRequest(resource) page = vsdModels.Pagination(**res) return page
[docs] def getAllPaginated(self, resource, itemlist=list()): """ returns all items as list :param str resource: resource path :param list itemlist: list of items :return: list of items :rtype: list of Pagination objects """ res = self.getRequest(resource) page = vsdModels.Pagination(**res) for item in page.items: itemlist.append(item) if page.nextPageUrl: return self.getAllPaginated(page.nextPageUrl, itemlist=itemlist) else: return itemlist
[docs] def iteratePageItems(self, page, func=dict): """ returns all items as list :param str resource: resource path :param func: function for converting resource :return: iterator of items :rtype: list of dict or model object """ for item in page.items: yield func(**item) if page.nextPageUrl: res = self.getRequest(page.nextPageUrl) nextPage = vsdModels.APIPagination(**res) for nextItem in self.iteratePageItems(nextPage, func=func): yield nextItem
[docs] def iterateAllPaginated(self, resource, func=dict): """ returns all items as list :param str resource: resource path :param func: function for converting resource :return: iterator of items :rtype: list of dict or model object """ res = self.getRequest(resource) page = vsdModels.APIPagination(**res) for item in self.iteratePageItems(page, func): yield item
[docs] def getObjects(self, idList=None): """ please describe what you are doing :param ??? idList: ??? :return: :rtype: """ if idList is None: idList = '' if idList in ['', 'published', 'unpublished']: return self.iterateAllPaginated('objects/%s' % idList, func=self.createObject) items = [] for curId in idList: items.append(self.getObject(curId)) return items
[docs] def getOID(self, selfURL): """ extracts the last part of the selfURL, tests if it is a number :param selfURL: (str) url to the object :return: either None if not an ID or the object ID (int) :raises: ValueError """ selfURL_path = urllib.parse.urlsplit(selfURL).path oID = Path(selfURL_path).name try: r = int(oID) except ValueError as err: print('no object ID in the selfUrl {0}. Reason: {1}'.format(selfURL, err)) r = None return r
[docs] def getResourceTypeAndId(self, url): """ please describe what you are doing :param: :return: :rtype: """ return url.rsplit('/', 2)[-2:]
def _instantiateResource(self, res): """ please describe what you are doing :param: :return: :rtype: """ try: pagination = vsdModels.Pagination(**res) pagination.validate() #will fail if it doesno't have totalCount return pagination except: resourcetype, id = self.getResourceTypeAndId(res['selfUrl']) if resourcetype == 'objects': return self.createObject(res) model = vsdModels.resourceTypes[resourcetype](**res) return model
[docs] def getResource(self, url): """ please describe what you are doing :param: :return: :rtype: """ res = self.getRequest(url) return self._instantiateResource(res)
[docs] def getObject(self, resource): """retrieve an object based on the objectID/selfUrl :param int,str resource: (str) selfUrl of the object or the (int) object ID :return: the object :rtype: APIObject """ resource = self.parseUrl(resource, 'objects') res = self.getRequest(resource) obj = self.createObject(res) return obj
[docs] def getFolder(self, resource): """retrieve an folder based on the folderID/selfUrl :param int,str resource: (str) selfUrl of the folder or the (int) folder ID :return: the folder :rtype: APIFolder """ resource = self.parseUrl(resource, 'folders') res = self.getRequest(resource) folder = vsdModels.Folder(**res) return folder
[docs] def getObjectFilesHash(self, obj): """ retrieve the filehash and the anonymized file hash values of a file :param APIObject f: API object :return: list of fileHash (uppercase) :rtype: list of str """ filehash = list() files = self.getObjectFiles(obj) for f in files: filehash.append(f.fileHashCode) return filehash
[docs] def walkFolder(self, folderUrl, topdown=True): """ please describe here :param: :return: :rtype: """ # similar to os.walk folderObject = self.getFolder(folderUrl) dirs = folderObject.childFolders nondirs = folderObject.containedObjects if dirs is None: dirs = [] if nondirs is None: nondirs = [] if topdown: yield folderObject, dirs, nondirs for nextDir in dirs: for x in self.walkFolder(nextDir.selfUrl): yield x if not topdown: yield folderObject, dirs, nondirs
[docs] def checkFileInObject(self, obj, fp): """ check if a local file is part of an object :param APIObject obj: API object :param Path fp: file to test :return: if contained or not :rtype: bool """ containted = False ## Haso of all files filehash = self.getObjectFilesHash(obj) ## Local hash BLOCKSIZE = 65536 hasher = hashlib.sha1() with'rb') as afile: buf = while len(buf) > 0: hasher.update(buf) buf = localhash = hasher.hexdigest() if localhash.upper() in filehash: containted = True return containted
[docs] def searchTerm(self, resource, search, mode='default'): """ search a resource using oAuths :param str resouce: resource path :param str search: term to search for :param str mode: search for partial match (default) or exact match (exact) :return: list of folder objects :rtype: json """ search = urlparse_quote(search) if mode == 'exact': url = self.fullUrl(resource) + '?$filter=Term%20eq%20%27{0}%27'.format(search) else: url = self.fullUrl(resource) + '?$filter=startswith(Term,%27{0}%27)%20eq%20true'.format(search) req = self.getRequest(url)
#return req.json()
[docs] def getFile(self, resource): """ return a APIFile object :param str resource: resource path :return: api file object or status code :rtype: APIFile """ resource = self.parseUrl(resource, 'files') res = self.getRequest(resource) fObj = vsdModels.File(**res) return fObj
[docs] def getObjectFiles(self, obj): """ return a list of file objects contained in an object :param APIObject obj: object :return: list of APIFile :rtype: list of APIFile """ filelist = list() fileurl = 'objects/{0}/files'.format( fl = self.iterateAllPaginated(fileurl) #fl = self.getAllPaginated(fileurl) for f in fl: res = self.getFile(f['selfUrl']) filelist.append(res) return filelist
[docs] def fileObjectVersion(self, data): """ Extract VSDID and selfUrl of the related Object Version of the file after file upload :param json data: file object data :result: returns the id and the selfUrl of the Object Version :rtype: str """ # data = json.loads(data) f = data['file'] obj = data['relatedObject'] fSelfUrl = f['selfUrl'] return obj['selfUrl'], self.getOID(obj['selfUrl'])
[docs] def getAllUnpublishedObjects(self, resource='objects/unpublished'): """ retrieve the unpublished objects as list of APIObject :param str resource: resource path (eg nextPageUrl) or default groups :param int rpp: results per page :param int page: page to display :return: list of objects :rtype: APIObjects """ objects = list() for item in self.iterateAllPaginated(resource, vsdModels.APIObject): obj = self.getObject(item.selfUrl) objects.append(obj) return objects
[docs] def getLatestUnpublishedObject(self): """ searches the list of unpublished objects and returns the newest object :return: last uploaded object :rtype: apiObject """ res = self.getRequest('objects/unpublished') if len(res['items']) > 0: obj = self.getObject(res['items'][0].get('selfUrl')) return obj else: print('you have no unpublished objects') return None
[docs] def getFolderByName(self, search, mode='default', squeeze=True): """ get a list of folder(s) based on a search string :param str search: term to search for :param str mode: search for partial match ('default') or exact match ('exact') :param bool squeeze: ???? what is squeeze :return: list of folder objects APIFolders :rtype: list of APIFolders """ search = urlparse_quote(search) if mode == 'exact': url = self.url + "folders?$filter=Name%20eq%20%27{0}%27".format(search) else: url = self.url + "folders?$filter=startswith(Name,%27{0}%27)%20eq%20true".format(search) result = list(self.iterateAllPaginated(url, vsdModels.APIFolder)) if len(result) == 1 and squeeze: folder = result[0] print('1 folder matching the search found') return folder else: print('list of {} folders matching the search found'.format(len(result))) return result
[docs] def getContainedFolders(self, folder): """ return a list of folder object contained in a folder :param APIFolder folder: folder object :return folderlist: a list of folder object (APIFolder) contained in the folder :rtype: list of APIFolder """ folderlist = list() if folder.childFolders: for fold in folder.childFolders: basic = vsdModels.APIBase(**fold) f = self.getFolder(basic.selfUrl) folderlist.append(f) return folderlist else: print('the folder does not have any contained folders') return None
[docs] def getContainedObjects(self, folder): """ return a list of object contained in a folder :param APIFolder folder: folder object :return objlist: a list of objects (APIFObject) contained in the folder :rtype: list of APIObject """ objlist = list() if folder.containedObjects: for obj in folder.containedObjects: basic = vsdModels.APIBase(**obj) o = self.getObject(basic.selfUrl) objlist.append(o) return objlist else: print('the folder does not have any contained objects') return None
[docs] def getModalityList(self): """ retrieve a list of modalities objects (APIModality) :return: list of available modalities :rtype: list of Modality """ modalities = list() items = self.iterateAllPaginated('modalities') if items: for item in items: modality = vsdModels.Modality(**item) modalities.append(modality) return modalities
[docs] def getModality(self, resource): """ retrieve a modalities object (APIModality) :param int,str resource: resource path to the of the modality :return: the modality object :rtype: Modality """ resource = self.parseUrl(resource, 'modalities') res = self.getRequest(resource) return vsdModels.Modality(**res)
[docs] def getFolderContent(self, folder, recursive=False, mode='d'): """ get the objects and folder contained in the given folder. can be called recursive to travel and return all objects :param APIFolder folder: the folder to be read :param bool recursive: travel the folder structure recursively or not (default) :param str mode: what to return: only objects (o), only folders (f) or default (d) folders and objects :return content: dictionary with folders (APIBase) and object (APIBase) :rtype: dict of APIBase """ objectmode = False foldermode = False if mode == 'o': objectmode = True elif mode == 'f': foldermode = True elif mode == 'd': objectmode = True foldermode = True else: print('mode {0} not supported'.format(mode)) folders = self.getContainedFolders(folder) temp = dict([('folder', folder), ('object', None)]) if foldermode: content = list([temp]) else: content = list() if objectmode: objects = self.getContainedObjects(folder) if objects is not None: for obj in objects: temp = dict([('folder', folder), ('object', obj)]) content.append(temp) if folders is not None: if recursive: for fold in folders: content.extend(self.getFolderContent(fold, mode=mode, recursive=True)) else: if foldermode: for fold in folders: temp = dict([('folder', folder), ('object', None)]) content.append(temp) return content
[docs] def searchOntologyTerm(self, search, oType='0', mode='default'): """ Search ontology term in a single ontology resource. Two modes are available to either find the exact term or based on a partial match :param str search: string to be searched :param int oType: ontlogy resouce code, default is FMA (0) :param str mode: find exact term (exact) or partial match (default) :returns: a list of ontology objects or a single ontology item :rtype: APIOntolgy """ search = urlparse_quote(search) if mode == 'exact': url = self.url + "ontologies/{0}?$filter=Term%20eq%20%27{1}%27".format(oType, search) else: url = self.url + "ontologies/{0}?$filter=startswith(Term,%27{1}%27)%20eq%20true".format(oType, search) res = self._get(url) if res.status_code == result = list() res = res.json() if len(res['items']) == 1: onto = vsdModels.Ontology() onto.set(res['items'][0]) print('1 ontology term matching the search found') return onto for item in iter(res['items']): onto = vsdModels.Ontology() onto.set(item) result.append(onto) return result else: return res.status_code
[docs] def getOntologyTermByID(self, oid, oType=0): """ Retrieve an ontology entry based on the IRI :param int oid: Identifier of the entry :param int oType: Resource type, available resources can be found using the OPTIONS on /api/ontologies). Default resouce is FMA (0) :return: ontology term entry :rtype: json """ url = "ontologies/{0}/{1}".format(oType, oid) req = self.getRequest(url) return req.json()
[docs] def getOntologyItem(self, resource, oType=0): """ Retrieve an ontology item object (APIOntology) :param int,str resource: resource path to the of the ontology item :param int oType: ontology type :return onto: the ontology item object :rtype: Ontology """ if isinstance(resource, int): resource = 'ontology/{0}/{1}'.format(resource, oType) res = self.getRequest(resource) onto = vsdModels.Ontology(**res) return onto
[docs] def getLicenseList(self): """ retrieve a list of the available licenses (License) :return: list of available license objects :rtype: list of License """ res = self.getRequest('licenses') licenses = list() if res: for item in iter(res['items']): lic = vsdModels.License(**item) licenses.append(lic) return licenses
[docs] def getLicense(self, resource): """ retrieve a license (License) :param int,str resource: resource path to the of the license :return license: the license object :rtype: License """ if isinstance(resource, int): resource = 'licenses/{0}'.format(resource) res = self.getRequest(resource) if res: license = vsdModels.License(**res) return license else: return None
[docs] def getObjectRightList(self): """ retrieve a list of the available base object rights (ObjectRight) :return: list of object rights :rtype: list of ObjectRight """ res = self.getRequest('object_rights') permission = list() if res: for item in iter(res['items']): perm = vsdModels.ObjectRight(**item) permission.append(perm) return permission
[docs] def getObjectRight(self, resource): """ retrieve a object rights object (ObjectRight) :param int,str resource: resource to the permission id (int) or selfurl (str) :return: perm object :rtype: ObjectRight """ if isinstance(resource, int): resource = 'object_rights/{0}'.format(resource) res = self.getRequest(resource) if res: perm = vsdModels.ObjectRight() perm.set(obj=res) return perm else: return None
[docs] def getGroups(self, resource='groups', rpp=None, page=None): """get the list of groups :param str resource: resource path (eg nextPageUrl) or default groups :param int rpp: results per page :param int page: page number to display :return: list of group objects :rtype: Group :return: pagination object :rtype: Pagination """ groups = list() res = self.getRequest(resource, rpp, page) ppObj = vsdModels.Pagination(**res) for g in ppObj.items: group = vsdModels.Group(**g) groups.append(group) return groups, ppObj
[docs] def getGroup(self, resource): """ retrieve a group object (Group) :param int,str resource: path to the group id (int) or selfUrl (str) :return: group object :rtype: Group """ if isinstance(resource, int): resource = 'groups/{0}'.format(resource) res = self.getRequest(resource) if res: group = vsdModels.Group() group.set(obj=res) return group else: return None
[docs] def getUser(self, resource): """ retrieve a user object (User) :param int,str resource: path to the user resource id (int) or selfUrl (str) :return: user object :rtype: User """ if isinstance(resource, int): resource = 'users/{0}'.format(resource) res = self.getRequest(resource) if res: user = vsdModels.User(**res) return user else: return None
[docs] def getPermissionSets(self, permset='default'): """ get the Object Rights for a permission set :param str permset: name of the permission set: available are private, protect, default, collaborate, full or a list of permission ids (list) :return perms: list of object rights objects :rtype: OjectRight """ if permset == 'private': lperms = list([1]) elif permset == 'protect': lperms = list([2, 3]) elif permset == 'default': lperms = list([2, 3, 4]) elif permset == 'collaborate': lperms = list([2, 3, 4, 5]) elif permset == 'full': lperms = list([2, 3, 4, 5, 6]) else: lperms = permset perms = list() for pid in lperms: perms.append(self.getObjectRight(pid)) return perms
[docs] def getObjectGroupRights(self, obj): """ get the list of attaced group rights of an object :param APIObject obj: the object :return rights: a list of ObjectGroupRights :rtype: list of APIObjectGroupRight """ rights = None if obj.objectGroupRights: rights = list() for item in obj.objectGroupRights: res = self.getRequest(item['selfUrl']) right = vsdModels.ObjectGroupRight() right.set(obj=res) rights.append(right) return rights
[docs] def getObjectUserRights(self, obj): """ get the list of attaced user rights of an object :param APIObject obj: the object :return: a list of ObjectUserRights :rtype: list APIObjectUserRight """ rights = None if obj.objectUserRights: rights = list() for item in obj.objectUserRights: res = self.getRequest(item['selfUrl']) right = vsdModels.ObjectUserRight() right.set(obj=res) rights.append(right) return rights
################################################# # api objects handling (MODIFY) ################################################
[docs] def postRequest(self, resource, data): """add data to an object :param str resource: relative path of the resource or selfUrl :param json data: data to be added to the resource :return: the resource object :rtype: json :raises: RequestException """ return self._post(self.fullUrl(resource), json=data)
[docs] def delRequest(self, resource): """ generic delete request :param str resource: resource path :return: status_code :rtype: int """ try: req = self.s.delete(self.fullUrl(resource)) if req.status_code == print('resource {0} deleted, 200'.format(self.fullUrl(resource))) return req.status_code elif req.status_code == print('resource {0} deleted, 204'.format(self.fullUrl(resource))) return req.status_code else: print('resource {0} NOT (not existing or other problem) deleted'.format(self.fullUrl(resource))) return req.status_code except requests.exceptions.RequestException as err: print('del request failed:', err) return
[docs] def delObject(self, obj): """ delete an unvalidated object :param APIObject obj: the object to delete :return: status_code :rtype: int """ try: req = self.s.delete(obj.selfUrl) if req.status_code == print('object {0} deleted'.format( return req.status_code else: return req.status_code print('not deleted', req.status_code) except requests.exceptions.RequestException as err: print('del request failed:', err)
[docs] def chunkedread(self, fp, chunksize): """ breaks the file into chunks of chunksize :param Path fp: the file to chunk :param int chunksize: size in bytes of the chunk parts :yields: chunk """ with'rb') as f: while True: chunk = if not chunk: break yield (chunk)
[docs] def chunkFileUpload(self, fp, chunksize=1024 * 4096): """ upload large files in chunks of max 100 MB size :param Path fp: the file to upload :param int chunksize: size in bytes of the chunk parts, default is 4MB :return: the generated object :rtype: APIObject """ parts = int(math.ceil(fp.stat().st_size / float(chunksize))) err = False maxchunksize = 1024 * 1024 * 100 if chunksize >= maxchunksize: print( 'not uploaded: defined chunksize {0} is bigger than the allowed maximum {1}'.format(chunksize, maxchunksize)) return None part = 0 for part, chunk in enumerate(self.chunkedread(fp, chunksize),1):'({2})uploading part {0} of {1}'.format(part, parts, files = {'file': (str(, chunk)} res = self._post(self.fullUrl('/chunked_upload?chunk={0}').format(part), files=files) print('finish, uploaded part {0} of {1} '.format(part, parts)) res = self._post(self.fullUrl('chunked_upload/commit?filename={0}'.format( return self.getFile(res['file']['selfUrl']), self.getObject(res['relatedObject']['selfUrl'])
# relObj = res['relatedObject'] # obj = self.getObject(relObj['selfUrl']) # return obj
[docs] def postFolder(self, parent, name, check=True): """ creates the folder with a given name (name) inside a folder (parent) if not already exists :param Folder parent: the root folder :param str name: name of the folder which should be created :param bool check: it we should check if already exist, default = True :return: the folder object of the generated folder or the existing folder :rtype: Folder """ folder = vsdModels.Folder() if parent is None: parent = self.getFolderByName('MyProjects', mode='exact') folder.parentFolder = vsdModels.APIBase(selfUrl=parent.selfUrl) = name exists = False if check: if parent.childFolders: for child in parent.childFolders: fold = self.getFolder(child.selfUrl) if fold is not None: if == name: print('folder {0} already exists, id: {1}'.format(name, exists = True return fold else: print('unexpected error, folder exists but cannot be retrieved') exists = True # print(self.postRequest('folders', data = data)) if not exists: data = folder.to_struct() # for name, field in folder: # if name not in data: # data[name] = None # print(data) res = self.postRequest('folders', data=data) folder.populate(**res) print('folder {0} created, has id {1}'.format(name, assert == name return folder
[docs] def uploadFile(self, filename): """ push (post) a file to the server :param Path filename: the file to be uploaded :return: the file object containing the related object selfUrl :rtype: APIObject """ try: data ='rb').read() ##workaround for file without file extensions if filename.suffix == '': filename = filename.with_suffix('.dcm') files = {'file': (str(, data)} except: print("opening file", filename, "failed, aborting") return res = self._post(self.url + 'upload', files=files) return self.getFile(res['file']['selfUrl']), self.getObject(res['relatedObject']['selfUrl'])
################################################# # api objects handling (UPDATE) ################################################
[docs] def putObject(self, obj): """update an objects information :param APIObject obj: an APIObject :return: the updated object :rtype: APIObject """ res = self.putRequest(obj.selfUrl, data=obj.to_struct()) if res: obj = self.createObject(res) return obj else: return res
[docs] def putRequest(self, resource, data): """ update data of an object :param str resource: defines the relative path to the api resource :param json data: data to be added to the object :return: the updated object :rtype: json """ try: req = self._put(self.fullUrl(resource), json=data) return req except requests.exceptions.RequestException as err: print('request failed:', err) return None
[docs] def postRequestSimple(self, resource): """ post (create) a resource :param str resource: resource path :return: the resource object :rtype: json """ req = return req.json()
[docs] def putRequestSimple(self, resource): """ put (update) a resource :param str resource: resource path :return: the resource object :rtype: json """ req = self.s.put(self.fullUrl(resource)) return req.json()
[docs] def publishObject(self, obj): """ publisch an unvalidated object :param APIObject obj: the object to publish :return: returns the object :rtype: APIObject """ try: req = self.s.put(obj.selfUrl + '/publish') if req.status_code == print('object {0} published'.format( return self.getObject(obj.selfUrl) except requests.exceptions.RequestException as err: print('publish request failed:', err)
[docs] def deleteFolderContent(self, folder): """ delete all content from a folder (Folder) :param Folder folder: a folder object :return state: returns true if successful, else False :rtype: bool """ state = False folder.containedObjects = None res = self.putRequest('folders', data=folder.to_struct())
[docs] def postObjectRights(self, obj, group, perms, isuser=False): """ translate a set of permissions and a group into the appropriate format and add it to the object .. warning:: DEPRECATED: use postObjectGroupRights or postObjectUserRights! :param APIObject obj: () the object you want to add the permissions to :param APIGroup/APIUser group: group object or user object :param list perms: list of Object Rights (APIObjectRight), use getPermissionSet to retrive the ObjectRights based on the permission sets :param bool isuser: set True if the groups variable is a user. Default is False :return: a group or user rights object :rtype: APIObjectGroupRight,APIObjectUserRight """ # creat the dict of rights rights = list() for perm in perms: rights.append(dict([('selfUrl', perm.selfUrl)])) if isuser: objRight = vsdModels.ObjectUserRight() objRight.relatedObject = dict([('selfUrl', obj.selfUrl)]) objRight.relatedRights = rights objRight.relatedUser = dict([('selfUrl', group.selfUrl)]) res = self.postRequest('object-user-rights', data=objRight.to_struct()) objRight.set(res) else: objRight = vsdModels.ObjectGroupRight() objRight.relatedObject = dict([('selfUrl', obj.selfUrl)]) objRight.relatedRights = rights objRight.relatedGroup = dict([('selfUrl', group.selfUrl)]) res = self.postRequest('object-group-rights', data=objRight.to_struct()) objRight.set(res) return objRight
[docs] def postObjectUserRights(self, obj, user, perms): """ translate a set of permissions and a user into the appropriate format and add it to the object :param Object obj: the object you want to add the permissions to :param User user: user object :param list perms: list of Object Rights (APIObjectRight), use getPermissionSet to retrive the ObjectRights based on the permission sets :return: user rights object :rtype: ObjectUserRight """ # creat the dict of rights rights = list() for perm in perms: rights.append(dict([('selfUrl', perm.selfUrl)])) objRight = vsdModels.ObjectUserRight() objRight.relatedObject = dict([('selfUrl', obj.selfUrl)]) objRight.relatedRights = rights objRight.relatedUser = dict([('selfUrl', user.selfUrl)]) res = self.postRequest('object-user-rights', data=objRight.to_struct()) objRight.set(res) return objRight
[docs] def postObjectGroupRights(self, obj, group, perms): """ translate a set of permissions and a group into the appropriate format and add it to the object :param Object obj: the object you want to add the permissions to :param Group group: group object :param list perms: list of Object Rights (APIObjectRight), use getPermissionSet to retrive the ObjectRights based on the permission sets :return: group rights object :rtype: ObjectGroupRight """ # creat the dict of rights rights = list() for perm in perms: rights.append(dict([('selfUrl', perm.selfUrl)])) objRight = vsdModels.ObjectGroupRight() objRight.relatedObject = dict([('selfUrl', obj.selfUrl)]) objRight.relatedRights = rights objRight.relatedGroup = dict([('selfUrl', group.selfUrl)]) res = self.postRequest('object-group-rights', data=objRight.to_struct()) objRight.set(res) return objRight
[docs] def addOntologyToObject(self, obj, ontology, pos=0): """ add an ontoly term to an object :param APIBase obj: basic object :param Ontology ontology: ontology object :param int pos: position of the ontology term, default = 1 :return: returns true if successfully added :rtype: bool """ isset = False if isinstance(pos, int): onto = vsdModels.ObjectOntology() onto.position = pos onto.object = dict([('selfUrl', obj.selfUrl)]) onto.ontologyItem = dict([('selfUrl', ontology.selfUrl)]) onto.type = ontology.type res = self.postRequest('object-ontologies/{0}'.format(ontology.type), data=onto.to_struct()) if res: isset = True else: print('position needs to be a number (int)') return isset
[docs] def deleteFolder(self, folder, recursive=False): """remove a folder (Folder) :param Folder folder: the folder object :return: True if deleted, False if not :rtype: bool """ state = False self.deleteFolderContent(folder) res = self.delRequest(folder.selfUrl) if res == 200 or res == 204: state = True if recursive: folders = self.getContainedFolders(folder) for f in folders: return self.deleteFolder(f, recursive=recursive) return state
[docs] def createFolderStructure(self, rootfolder, filepath, parents): """ creates the folders based on the filepath if not already existing, starting from the rootfolder :param Folder rootfolder: the root folder object :param Path filepath: filepath of the file :param int parents: number of partent levels to create from file folder :return: the last folder in the tree :rtype: Folder """ fp = filepath.resolve() folders = list( folders.reverse() ##remove file from list if fp.is_file(): folders.remove(folders[0]) for i in range(parents, len(folders)): folders.remove(folders[i]) folders.reverse() fparent = rootfolder if fparent: for fname in folders: fchild = None if fparent: if fparent.childFolders: for child in fparent.childFolders: fold = self.getFolder(child.selfUrl) if == fname: fchild = fold if not fchild: f = vsdModels.Folder() = fname f.parentFolder = vsdModels.Folder(selfUrl=fparent.selfUrl) # f.toJson() res = self.postRequest('folders', f.to_struct()) fparent.populate(**res) else: fparent = fchild return fparent else: print('Root folder does not exist', rootfolder) # jData = jFolder(folder) return None
[docs] def addObjectToFolder(self, target, obj): """ add an object to the folder :param Folder target: the target folder :param Object obj: the object to copy :return: updated folder :rtype: Folder """ objSelfUrl = vsdModels.APIBase(**obj.to_struct()) if not objSelfUrl in target.containedObjects: target.containedObjects.append(objSelfUrl) res = self.putRequest('folders', data=target.to_struct()) target = vsdModels.Folder(**res) return target else: return target
[docs] def removeObjectFromFolder(self, target, obj): """ remove an object from the folder :param APIFolder target: the target folder :param APIObject obj: the object to remove :return: updated folder :rtype: APIFolder """ objSelfUrl = dict([('selfUrl', obj.selfUrl)]) objects = target.containedObjects isset = False if objects: if objects.count(objSelfUrl) > 0: objects.remove(objSelfUrl) target.containedObjects = objects res = self.putRequest('folders', data=target.to_struct()) if not isinstance(res, int): isset = True else: print('object not part of that folder') else: print('folder containes no objects') return isset