source: ndgCommon/trunk/ndg/common/src/clients/xmldb/eXist/existdbclient.py @ 4828

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/ndgCommon/trunk/ndg/common/src/clients/xmldb/eXist/existdbclient.py@4828
Revision 4828, 30.9 KB checked in by cbyrom, 12 years ago (diff)

Improve exist client checking of results + extend logging + add support
for producing DIF feeds + improve error handling.

Line 
1'''
2 Class supporting set up and communication with eXist DB
3 for the purposes of creating and updating atoms
4 
5 @author: C Byrom - Tessella 08
6'''
7import os, sys, logging, datetime, uuid, re
8from ndg.common.src.models.Atom import Atom
9from ndg.common.src.models import AtomState
10from ndg.common.src.clients.xmldb.eXist.eXistConnector import eXistConnector as ec
11from ndg.common.src.clients.xmldb.eXist.existdbfeedclient import eXistDBFeedClient as feedClient
12from ndg.common.src.dal.DocumentRetrieve import DocumentRetrieve as DR
13from ndg.common.src.lib.atomutilities import *
14from ndg.common.src.lib.existinitialiser import EXistInitialiser
15from xml.etree import ElementTree as ET
16from threading import Thread
17
18class DuplicateError(Exception):
19    """
20    Exception handling for when a duplicated atom doc is discovered
21    """
22    def __init__(self, msg):
23        logging.error(msg)
24        Exception.__init__(self, msg)
25
26class backingUpThread(Thread):
27   
28   def __init__ (self, existClient, doc, collection, fileName):
29       logging.info("Setting up thread to run backup for file, '%s'" %fileName)
30       Thread.__init__(self)
31       self.ec = existClient
32       self.doc = doc
33       self.collection = collection
34       self.fileName = fileName
35       logging.info("- finished setting up thread")
36     
37   def run(self):
38       logging.info("Running thread to perform backup of file, '%s'" %self.fileName)
39       self.ec.createEXistFile(self.doc, self.collection, self.fileName)
40       logging.info("- finished backing up file")
41
42class publishingThread(Thread):
43   
44   def __init__ (self, edc, atom):
45       logging.info("Setting up thread to publish atom data for atom, '%s'" %atom.datasetID)
46       Thread.__init__(self)
47       self.edc = edc
48       self.atom = atom
49       logging.info("- finished setting up thread")
50     
51   def run(self):
52       logging.info("Running thread to publish atom data for atom, '%s'" %self.atom.datasetID)
53       self.edc._eXistDBClient__publishAtom(self.atom)
54       logging.info("- finished publishing atom data")
55       
56
57class eXistDBClient:
58   
59    def __init__(self, configFile = None, eXistDBHostname = None, \
60                 eXistPortNo = '8080', \
61                 loadCollectionData=False, setUpDB = False):
62        '''
63        Initialise a connection to the eXistDB
64        @keyword configFile: config file to use in setting up DB
65        @keyword existDBHostname: name of eXist DB to use - if not specified, the first
66        host in the config file is used
67        @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults
68        to '8080' if not set
69        @keyword loadCollectionData: preload info on all the eXist collections, if True (default False)
70        @keyword setUpDB: if True, create the basic collection structure and ingest the
71        atom schemas.  Default is False.
72        '''
73        logging.info("Initialising connection to eXist DB")
74        self.eXistDBHostname = eXistDBHostname
75        self.eXistPortNo = eXistPortNo
76        logging.debug("- connecting to DB, '%s:%s', with config file, '%s'" \
77                      %(eXistDBHostname or 'Default', self.eXistPortNo, \
78                        configFile or 'Default'))
79       
80        inputs = {}
81       
82        self.atomSchema = None
83       
84        # this keeps a record of the last file backed up - incase we need
85        # to do a rollback if there's an error encountered
86        self.backupName = None
87
88        # if we preload the collections data, it'll be stored here
89        self.collections = None
90       
91        # NB, there are two routes through here: if a config file is specified
92        # without a hostname, the host will be taken to be the first entry in
93        # the config file; if a hostname is specified, it will be used explicitly
94        if configFile:
95            inputs['pwfile'] = configFile
96            if not self.eXistDBHostname:
97                self.__loadDBDetails(configFile)
98           
99        # Now set up the connection
100        logging.debug(inputs)
101        self.xmldb = DR(self.eXistDBHostname, **inputs)
102
103        # set up feed client, too - NB, info should be added to the feed as it is added
104        # to the eXist collections, so the standard dbclient and the feed client
105        # are intrinsicly linked
106        self.feedClient = feedClient(self.xmldb.auth, 
107                                     eXistDBHostname = self.eXistDBHostname, 
108                                     eXistPortNo = self.eXistPortNo)
109       
110        if setUpDB:
111            # set up the eXist DB appropriately for use with ndgCommon
112            initialiser = EXistInitialiser(self)
113            initialiser.initialise()
114       
115        if loadCollectionData:
116            self.collections = self.getAllAtomCollections()
117
118        # dict to hold the data centre element info - used to augment atom info
119        # when doing atom->DIF translations - to avoid having to constantly
120        # look this up for the different provider IDs
121        self.dataCentres = {}
122           
123        logging.info("eXist DB connection initialised")
124
125
126    def __getSchema(self):
127        logging.debug("Getting atom schema data")
128        if not self.atomSchema:
129            self.atomSchema = ec.SCHEMATA_COLLECTION_PATH + '/' + \
130                self.xmldb.xq.ATOM_MOLES_SCHEMA  + '.xsd'
131
132        return self.atomSchema
133
134    AtomSchema = property(fget=__getSchema, doc="Atom schema path")
135
136
137    def createCollections(self, collections):
138        '''
139        Create the specified collections in eXist
140        @param collections: array of collections to create
141        @return True if successful
142        '''
143        logging.info("Setting up eXist collections")
144        for col in collections:
145            logging.debug("Creating collection, '%s'" %col)
146            self.xmldb.createCollection(col)
147        logging.info("All collections set up")
148       
149
150    def getAtom(self, id):
151        '''
152        Lookup the atom with id
153        @param id: id of the atom to retrieve
154        '''
155        logging.info("Looking up atom with id, '%s'" %(id))
156        doc = self.xmldb.get('', DR.ATOM, id, \
157                             targetCollection = ec.BASE_COLLECTION_PATH)
158        logging.info("Atom retrieved")
159        return doc
160       
161
162    def checkAtomSchemaCompliance(self, atomPath, atom = None, isDebug = False):
163        '''
164        Validate the specified atom in eXist with the atom schemata in eXist
165        @param atomPath: path to the atom in eXist
166        @keyword atom: if set to an atom, this will be created temporarily in eXist
167        - since it may not already exist there.  Once validation is completed, the
168        file will be removed from eXist.
169        @keyword isDebug: if True, return full error details, otherwise only return
170        a summary
171        @return array: containing any errors found - NB, if an empty array is returned,
172        this indicates successful validation
173        '''
174        logging.info("Validating atom, '%s' against schemata in eXist" %atomPath)
175        try:
176            if atom:
177                logging.info("Creating temporary file in eXist to do validation against")
178                fileName = atom.datasetID + str(datetime.datetime.today().microsecond)
179                self.createEXistFile(atom.toPrettyXML(), \
180                                     atom.getDefaultCollectionPath(), fileName)
181                atomPath = atom.getDefaultCollectionPath() + fileName
182               
183            validationQuery = 'validation:validate-report("' + atomPath + \
184                '", xs:anyURI("' + self.AtomSchema + '"))'
185            logging.debug("Running validation, '%s'" %validationQuery)
186
187            id, result = self.xmldb.executeQuery(validationQuery)
188            errorMessage = None
189            if result['hits'] == 0: 
190                errorMessage = "Validation did not complete successfully - please retry"
191            elif result['hits'] > 1:
192                errorMessage = "More than one atom was validated - expecting only a single atom validation - please retry"
193               
194        except Exception, e:
195            errorMessage = "Error encountered whilst validating atom: '%s'" %e.message
196
197        if atom:
198            logging.info("Deleting temporary file in eXist")
199            self.deleteEXistFile(atomPath)
200
201        if errorMessage:
202            logging.error(errorMessage)
203            raise SystemError(errorMessage)
204       
205        doc = self.xmldb.retrieve(id, 0)
206        et = ET.fromstring(doc)
207        status = et.findtext('status')
208       
209        # retrieve the error detail if invalid
210        errors = []
211        if status == 'invalid':
212            logging.info("Atom is invalid - details as follows:")
213            for error in et.findall('message'):
214                lineNo = error.attrib.get('line')
215                colNo = error.attrib.get('column')
216                level = error.attrib.get('level')
217                repeat = error.attrib.get('repeat')
218                errorText = error.text
219                # remove the meaningless error type from message
220                if errorText.startswith('cvc-'):
221                    errorText = ':'.join(errorText.split(':')[1:])
222                errorMessage = "%s at line %s, column %s: %s" %(level, lineNo, colNo, errorText)
223                if repeat:
224                    errorMessage += " (%s times)" %repeat
225
226                if isDebug:
227                    errors.append(errorMessage)
228                else:
229                    errors.append(errorText)
230                logging.info(errorMessage)
231        else:
232            logging.info("Atom is valid")
233           
234        logging.info("Validation complete")
235        return errors
236
237       
238
239    def __loadDBDetails(self, configFile):
240        '''
241        Retrieve info from the eXist db config file
242        '''
243        logging.info("Loading DB config data")
244        # Check this file exists
245        if not os.path.isfile(configFile):
246            errorMessage = "Could not find the DB config file, %s; please make sure this " \
247                     "is available from the running directory" %configFile
248            logging.error(errorMessage)
249            raise ValueError(errorMessage)
250        dbinfo_file=open(configFile, "r")
251        dbinfo = dbinfo_file.read().split()
252        if len(dbinfo) < 3:
253            errorMessage = 'Incorrect data in DB config file'
254            logging.error(errorMessage)
255            raise ValueError(errorMessage)
256        self.eXistDBHostname = dbinfo[0]
257        self._username = dbinfo[1]
258        self._pw = dbinfo[2]
259        logging.info("DB config data loaded")
260
261
262    def __lookupEXistFile(self, docPath):
263        '''
264        Look up a file in eXist using XPath
265        @param docPath: path to doc to look up
266        @return: id returned from query, with which to retrieve doc; if doc doesn't exist, return None
267        '''
268        logging.info("Retrieving info for file, '%s'" %docPath)
269       
270        id, doc = self.xmldb.executeQuery('doc("' + docPath + '")')
271       
272        if doc['hits'] == 0:
273            logging.info("File does not exist in eXist DB")
274            return None
275       
276        # check status of result
277        doc = self.xmldb.retrieve(id, 0)
278        et = ET.fromstring(doc)
279        status = et.findtext('status')
280        if status == 'invalid':
281            raise SystemError("Problem occured whilst looking up file, '%s'" %docPath)
282       
283        logging.info("Found file - returning result ID")
284        return id
285
286
287    def isNewEXistCollection(self, collectionPath):
288        '''
289        Look up a collection in eXist using XPath
290        @param collectionPath: path to collection to look up
291        @return: False if collection exists, True otherwise
292        '''
293        logging.info("Checking if collection, '%s', exists in eXist" %collectionPath)
294       
295        id, doc = self.xmldb.executeQuery('collection("' + collectionPath + '")')
296        if doc['hits'] == 0:
297            logging.info("Collection does not exist in eXist DB")
298            return True
299        logging.info("Found collection")
300        return False
301         
302
303    def getEXistFile(self, docPath):
304        '''
305        Use XQuery to retrieve the specified document from eXist
306        @param docPath: the path of the doc to retrieve
307        @return: contents of document if exists, None otherwise
308        '''
309        id = self.__lookupEXistFile(docPath)
310       
311        if not id and id != 0:
312            logging.info("No file found - nothing to retrieve")
313            return None
314       
315        logging.info("Found file - now retrieving content")
316        doc = self.xmldb.retrieve(id, 0)
317        return doc
318
319
320    def isNewEXistFile(self, docPath):
321        '''
322        Test if a file already exists in eXist
323        @param docPath: path of file in eXist to look up
324        @return: True if a new file, False if otherwise
325        '''
326        logging.info("Checking if file, '%s', exists in eXist DB" %docPath)
327       
328        id = self.__lookupEXistFile(docPath)
329
330        if id:
331            return False
332       
333        return True
334
335
336    def __addTimeStamp(self, fileName):
337        '''
338        Add timestamp to input filename
339        NB, this assumes there is a file type identifier at the end of the filename; if so, the datestamp
340        is included before this; if not it is just added at the end
341        '''
342        bits = fileName.rsplit(".", 1)
343        fileName = bits[0] + "_" + datetime.datetime.today().strftime("%Y-%m-%dT%H_%M_%S")
344       
345        if len(bits) > 1:
346            fileName += "." + bits[1]
347        return fileName
348
349
350    def __removeTimeStamp(self, fileName):
351        '''
352        Remove a timestamp from a file name
353        '''
354        match = re.search('(.*)(_\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2})(.*)', fileName)
355        if match:
356            return match.group(1) + match.group(3)
357
358        return fileName
359
360    def backupEXistFile(self, collection, fileName, runAsynch = True):
361        '''
362        Backup a file that exists in the eXist DB
363        - NB, this really just creates a new file with the same contents in a
364        backup dir
365        - to improve efficiency, spawn this process as a new thread since we
366        don't need to worry about the outcome
367        @param collection: path of the collection to store the file in
368        @param fileName: name of file to add in eXist
369        @param runAsynch: if True, do the backup asynchronously in a separate thread
370        @return: path to new backup file
371        '''
372        if not collection.endswith('/'):
373            collection += '/'
374           
375        docPath = collection + fileName
376        logging.info("Backing up file, '%s', in eXist DB" %docPath)
377
378        logging.debug("Firstly, retrieve file contents from eXist")
379        doc = self.getEXistFile(docPath)
380        if not doc:
381            errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath
382            logging.error(errorMessage)
383            raise SystemError(errorMessage)
384       
385        # Now adjust the collection to map to the backup dir
386        collection = collection.replace(ec.BASE_COLLECTION_PATH, ec.BACKUP_COLLECTION_PATH)
387        collection = collection.replace(ec.NDG_A_COLLECTION_PATH, ec.NDG_A_COLLECTION_PATH_BACKUP)
388       
389        # add timestamp to filename
390        fileName = self.__addTimeStamp(fileName)
391        docPath = collection + fileName
392       
393        if runAsynch:
394            # run the back up in a separate thread
395            thread = backingUpThread(self, doc, collection, fileName)
396            thread.start()
397        else:
398            self.createEXistFile(doc, collection, fileName)
399           
400        return docPath
401
402
403    def createEXistFile(self, xml, collection, fileName):
404        '''
405        Add the input file to the eXist DB
406        @param xml: contents of xml file to create in eXist
407        @param collection: path of the collection to store the file in
408        @param fileName: name of file to add in eXist
409        @return: True, if file created successfully
410        '''
411        logging.info("Adding file, '%s' to eXist DB collection, '%s'" \
412                     %(fileName, collection))
413        logging.debug("data: %s" %xml)
414
415        # create the collection, if it doesn't already exist - NB, this won't overwrite anything
416        if self.collections is None or not self.collections.get(collection):
417            self.createCollections([collection])
418           
419        status = self.xmldb.storeXML(xml, collection + "/" + fileName, overwrite=1)
420        if not status:
421            errorMessage = "Command to create file in eXist did not complete successfully - exiting"
422            logging.error(errorMessage)
423            raise SystemError(errorMessage)
424       
425        logging.info("File added to eXist")
426        return True
427
428
429    def deleteEXistFile(self, docPath):
430        '''
431        Delete the input file from eXist DB
432        @param docPath: path of document to delete
433        @return: True, if file deleted successfully
434        '''
435        logging.info("Deleting file, '%s', from eXist DB" %docPath)
436
437        status = self.xmldb.removeDoc(docPath)   
438        if not status:
439            errorMessage = "Command to delete file in eXist did not complete successfully - exiting"
440            logging.error(errorMessage)
441            raise SystemError(errorMessage)
442       
443        logging.info("File deleted from eXist")
444        return True
445
446
447    def createOrUpdateEXistFile(self, xml, collection, fileName):
448        '''
449        Check if a file already exists in eXist; if it does, run an
450        update (which will backup the existing file), otherwise create
451        the file in eXist
452        @param xml: contents of xml file to create/update in eXist
453        @param collection: path of the collection to store the file in
454        @param fileName: name of file to add in eXist
455        '''
456        logging.info("Creating or updating file in eXist...")
457        if not self.isNewEXistFile(collection + fileName):
458            self.backupEXistFile(collection, fileName)
459           
460        self.createEXistFile(xml, collection, fileName)
461
462
463    def getAllAtomIDs(self):
464        '''
465        Retrieve all the atom IDs in the atoms directory - NB, this can
466        be a quick way of producing a cache of data to check - e.g. to avoid
467        multiple calls to getAtomFileCollectionPath
468        @return: ids - array of all atom IDs
469        '''
470        logging.info("Retrieving all atom ids")
471        xq = self.xmldb.xq.actual('atomList', '/db/atoms', '', '')
472        id, doc = self.xmldb.executeQuery(xq)
473        if doc['hits'] == 0: 
474            return []
475       
476        indices = range(doc['hits'])
477       
478        doc = self.xmldb.retrieve(id, 0)
479        et = ET.fromstring(doc)
480        ids = []
481        for member in et:
482            fn = member.findtext('{http://www.w3.org/2005/Atom}repositoryID')
483            ids.append(fn)
484        logging.debug("Found ids, '%s'" %ids)
485        return ids
486
487
488    def getAllAtomCollections(self):
489        '''
490        Get all atom collection paths and store in a dictionary - for easy
491        reference when doing lots of things at once
492        @return: dict with key/val of atomID/collectionPath
493        '''
494        logging.info("Retrieving all atom collection paths")
495        xq = self.xmldb.xq.actual('atomList', '/db/atoms', '', '')
496        id, doc = self.xmldb.executeQuery(xq)
497        if doc['hits'] == 0: 
498            return []
499       
500        indices = range(doc['hits'])
501       
502        doc = self.xmldb.retrieve(id, 0)
503        et = ET.fromstring(doc)
504        colData = {}
505        for member in et:
506            collection = member.findtext('{http://www.w3.org/2005/Atom}fileName')
507            fileName = collection.split('/')[-1]
508            fileName = fileName.split('.')[0]
509            dir = '/'.join(collection.split('/')[0:-1])
510            colData[fileName] = dir
511
512        logging.debug("Finished looking up atom paths")
513        return colData
514
515
516    def getAtomPublicationState(self, atomID):
517        '''
518        Retrieve the publication state of the specified atom - by
519        checking the collection it is in
520        @param atom: atom id to look up
521        @return: AtomState for the atom.  NB, if the ID is not found, assume
522        we're dealing with a new atom and set the state as the working state
523        '''
524        logging.debug("Finding atom publication state")
525        path = self.getAtomFileCollectionPath(atomID)
526        for state in AtomState.allStates.values():
527            if path.find('/%s' %state.collectionPath) > -1:
528                logging.debug("- state found: '%s'" %state.title)
529                return state
530       
531        logging.debug("- state not found - returning WORKING state")
532        return AtomState.WORKING_STATE
533
534
535    def getAtomFileCollectionPath(self, atomID):
536        '''
537        Given an atom id, determine and return the collection path in eXist
538        of the associated atom file
539        @param atom: atom id to look up
540        @return: collection path, if it exists, None, otherwise
541        '''
542        logging.info("Looking up collection path for atom ID, '%s'" %atomID)
543        xq = self.xmldb.xq['atomFullPath']
544        xq = xq.replace('TargetCollection', ec.BASE_COLLECTION_PATH)
545        xq = xq.replace('LocalID', atomID)
546
547        id, doc = self.xmldb.executeQuery(xq)
548        if doc['hits'] == 0:
549            logging.info("No document found with the specified ID")
550            return None
551
552        doc = self.xmldb.retrieve(id,0,{})
553
554        docET = ET.fromstring(doc)
555        collPath = docET.text + '/'
556        logging.debug("Found collection path, '%s'" %collPath)
557        return collPath
558
559
560    def deleteAtomInExist(self, atom):
561        '''
562        Delete the given atom from the eXist DB - using the atom
563        details to work out the required path to delete
564        '''
565        logging.info("Deleting atom from eXist")
566        atomPath = atom.getDefaultCollectionPath() + atom.atomName
567        self.deleteEXistFile(atomPath)
568        logging.info("Atom deleted")
569
570
571    def changeAtomPublicationStateInExist(self, atom, newState):
572        '''
573        Adjust the publication state of an atom in eXist
574        @param atom: the Atom data model of the atom whose publication state
575        needs to change
576        @param newState: an AtomState object representing the new publication
577        state of the atom
578        @return atom: atom data model with updated state
579        '''
580        logging.info("Changing the publication state of atom - from '%s' to '%s'" \
581                     %(atom.state.title, newState.title))
582        oldState = atom.state
583        # firstly, create atom in new publication state collection - so data isn't
584        # lost if this fails
585        atom.state = newState
586        self.createEXistFile(atom.toPrettyXML(), atom.getDefaultCollectionPath(), 
587                             atom.atomName)
588       
589        # now delete atom in the old publication state
590        atom.state = oldState
591        self.deleteAtomInExist(atom)
592        logging.info("- atom created in new publication state and removed from old one")
593        atom.state = newState
594       
595        # update feeds + create DIFs, if needed
596        if atom.isPublished():
597            self.runAsynchAtomPublish(atom)
598       
599        return atom
600       
601
602           
603    def createAtomInExist(self, atom, replaceAtom = True, runAsynch = True):
604        '''
605        Create an atom in the eXist DB - using the atom contents to work out
606        the location + data set ID
607        @param atom: atom object to create in the DB
608        @keyword replaceAtom: if False and the atom is already available in eXist
609        @param runAsynch: if True, if a backup of an existing file, do this
610        asynchronously in a separate thread + do the feed publishing and DIF
611        creating in a separate thread, too
612        then raise a ValueError.
613        '''
614        logging.info("Creating atom in eXist")
615        if not atom:
616            raise ValueError("Input is not an object - cannot create in eXist")
617        if not isinstance(atom, Atom):
618            raise ValueError("Input object is not an Atom object - cannot create in eXist")
619       
620        # if the atom has no dataset ID, generate and add one
621        # NB, this should only be the case when the atom is being created
622        # via the web interface
623        isNew = False
624        if not atom.datasetID:
625            isNew = True
626            atom.setDatasetID(atom.atomTypeID + '_' + str(uuid.uuid1()))
627
628        eXistCollection = None
629        if self.collections is not None: # cope with empty dict
630            eXistCollection = self.collections.get(atom.datasetID)
631        else:
632            eXistCollection = self.getAtomFileCollectionPath(atom.datasetID)
633       
634        # if collection not found, assume we're dealing with a new atom; get its
635        # default collection
636        if not eXistCollection:
637            eXistCollection = atom.getDefaultCollectionPath()
638
639            # check if we need a new provider feed set up
640            providerCollection = ec.PROVIDER_FEED_PATH + atom.ME.providerID + '/'
641            if self.isNewEXistCollection(providerCollection):
642                logging.info("Creating feed for new provider ID")
643                self.createCollections([providerCollection])
644                self.feedClient.createAtomFeed(providerCollection,
645                                               self.feedClient.PROVIDERLEVEL_ATOM_FEED_TITLE %atom.ME.providerID)
646           
647        elif isNew:
648            # in this situation we're trying to create an atom with the same
649            # name via the web interface - this can't be allowed - so retry to
650            # generate a new ID
651            atom.datasetID = None
652            self.createAtomInExist(atom)
653            return
654        # create backup of atom if it already exists
655        else:
656            if not replaceAtom:
657                raise DuplicateError('An atom with the specified ID (%s) already exists in eXist' \
658                                     %atom.datasetID)
659            # store name of backup - to allow restore, if subsequent ops fail
660            self.backupName = self.backupEXistFile(eXistCollection, atom.atomName, \
661                                                   runAsynch = runAsynch)
662           
663            # also change updated date to current time
664            atom.updatedDate = datetime.datetime.today().strftime("%Y-%m-%dT%H:%M:%SZ")
665           
666        self.createEXistFile(atom.toPrettyXML(), eXistCollection, atom.atomName)
667       
668        logging.info("Atom created in eXist")
669       
670        # lastly, if we're dealing with a published atom, update DIF records +
671        # feeds
672        if atom.isPublished():
673            if runAsynch:
674                self.runAsynchAtomPublish(atom)
675            else:
676                self.__publishAtom(atom)
677           
678        return atom
679   
680    def runAsynchAtomPublish(self, atom):
681        thread = publishingThread(self, atom)
682        thread.start()
683           
684   
685    def __publishAtom(self, atom):
686        '''
687        Add atom info to the various feeds - and if it is a data entity, use
688        it to create a DIF document and add this to feeds, also
689        '''
690        if atom.isDE():
691            self.createDIFDocumentFromAtom(atom)
692       
693        self.feedClient.addAtomToFeeds(atom)
694       
695   
696   
697    def restoreBackup(self, docPath):
698        '''
699        Restore the backed up file - effectively recreating in the non-backup collection
700        @param docPath: path to file to backup
701        '''
702        logging.info("Restoring file, '%s' in eXist" %docPath)
703        doc = self.getEXistFile(docPath)
704       
705        if not doc:
706            errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath
707            logging.error(errorMessage)
708            raise SystemError(errorMessage)
709       
710        bits = docPath.split('/')
711        fileName = bits[-1]
712        collection = '/'.join(bits[0:-1])
713        # remove timestamp from filename
714        fileName = self.__removeTimeStamp(fileName)
715       
716        # Now adjust the collection to map to the backup dir
717        collection = collection.replace(ec.BACKUP_COLLECTION_PATH, ec.BASE_COLLECTION_PATH)
718        collection = collection.replace(ec.NDG_A_COLLECTION_PATH_BACKUP, ec.NDG_A_COLLECTION_PATH)
719       
720        self.createEXistFile(doc, collection, fileName)
721        logging.info("File restored")
722
723
724    def transformAtomIntoDIF(self, atom):
725        '''
726        Transform an atom into a DIF document - using an XQuery transform ran
727        in eXist
728        @param atom: the Atom data model to convert
729        @return: the produced DIF document
730        '''
731        logging.info("Creating DIF record from atom - using XQuery transform")
732       
733        # get the query and set this up to use properly
734        targetCollection = atom.getPublicationStatePath()
735        providerID = atom.ME.providerID
736        # remove the trailing forward slash - otherwise the xquery won't work
737        if targetCollection.endswith('/'):
738            targetCollection = targetCollection[0:-1]
739        xquery = self.xmldb.xq.actual('atom2DIF', targetCollection, \
740                                 providerID, atom.datasetID)
741
742        discovery_id,s = self.xmldb.executeQuery(xquery)
743        if s.has_key('hits') and not s['hits']:
744            raise Exception("XQuery produced no results - suggesting a problem with the query")
745        doc = self.xmldb.retrieve(discovery_id,0)
746       
747        # add various missing info
748        # get the organisation data for the repository
749        # - NB, store for re-use to avoid multiple lookups of the same info
750        if self.dataCentres.has_key(providerID):
751            dataCentre = self.dataCentres[providerID]
752        else:
753            dataCentre = getDataCentreDIFElement(providerID, self.xmldb)
754            self.dataCentres[providerID] = dataCentre
755
756        # add various other data to atoms - to make up with the incomplete data spec there
757        doc = addOrgData(doc, targetCollection, dataCentre)
758        doc = expandParametersData(doc)
759        doc = addStandardKeywords(doc)
760
761        logging.info("Transform completed successfully - returning DIF doc")
762        return doc
763
764
765    def createDIFDocumentFromAtom(self, atom):
766        '''
767        Transform an atom into a DIF document and store this in eXist
768        @param atom: the Atom data model to convert
769        '''
770        logging.info("Creating and storing DIF document")
771        doc = self.transformAtomIntoDIF(atom)
772        fileName = atom.atomName.replace('.atom', '.xml')
773       
774        # now add to eXist
775        providerID = atom.ME.providerID
776        collectionPath = ec.DIF_COLLECTION_PATH + providerID
777       
778        # NB, check if a top level provider collection exists; if not, add
779        # this with a feed
780        # NB, the collection will be created along with the DIF file - so set the
781        # feed up after the file has been created
782        setupFeed = False
783        if self.isNewEXistCollection(collectionPath):
784            setupFeed = True
785
786        self.createOrUpdateEXistFile(doc, collectionPath,
787                                     fileName)       
788        if setupFeed:
789            logging.info("Creating feed for new provider ID")
790            self.createCollections([collectionPath])
791            self.feedClient.createAtomFeed(collectionPath,
792                                           self.feedClient.PROVIDERLEVEL_DIF_FEED_TITLE %providerID + '/')
793        logging.info("DIF document created and stored")
794
Note: See TracBrowser for help on using the repository browser.