1 | ''' |
---|
2 | Class supporting set up and communication with eXist DB |
---|
3 | for the purposes of creating and updating atoms |
---|
4 | |
---|
5 | @author: C Byrom - Tessella 08 |
---|
6 | ''' |
---|
7 | import os, sys, logging, datetime, uuid, re |
---|
8 | from ndg.common.src.models.Atom import Atom |
---|
9 | from ndg.common.src.models import AtomState |
---|
10 | from ndg.common.src.clients.xmldb.eXist.eXistConnector import eXistConnector as ec |
---|
11 | from ndg.common.src.clients.xmldb.eXist.existdbfeedclient import eXistDBFeedClient as feedClient |
---|
12 | from ndg.common.src.dal.DocumentRetrieve import DocumentRetrieve as DR |
---|
13 | from ndg.common.src.lib.atomutilities import * |
---|
14 | from xml.etree import ElementTree as ET |
---|
15 | from threading import Thread |
---|
16 | |
---|
17 | class DuplicateError(Exception): |
---|
18 | """ |
---|
19 | Exception handling for when a duplicated atom doc is discovered |
---|
20 | """ |
---|
21 | def __init__(self, msg): |
---|
22 | logging.error(msg) |
---|
23 | Exception.__init__(self, msg) |
---|
24 | |
---|
25 | class backingUpThread(Thread): |
---|
26 | |
---|
27 | def __init__ (self, existClient, doc, collection, fileName): |
---|
28 | logging.info("Setting up thread to run backup for file, '%s'" %fileName) |
---|
29 | Thread.__init__(self) |
---|
30 | self.ec = existClient |
---|
31 | self.doc = doc |
---|
32 | self.collection = collection |
---|
33 | self.fileName = fileName |
---|
34 | logging.info("- finished setting up thread") |
---|
35 | |
---|
36 | def run(self): |
---|
37 | logging.info("Running thread to perform backup of file, '%s'" %self.fileName) |
---|
38 | self.ec.createEXistFile(self.doc, self.collection, self.fileName) |
---|
39 | logging.info("- finished backing up file") |
---|
40 | |
---|
41 | class publishingThread(Thread): |
---|
42 | |
---|
43 | def __init__ (self, edc, atom): |
---|
44 | logging.info("Setting up thread to publish atom data for atom, '%s'" %atom.datasetID) |
---|
45 | Thread.__init__(self) |
---|
46 | self.edc = edc |
---|
47 | self.atom = atom |
---|
48 | logging.info("- finished setting up thread") |
---|
49 | |
---|
50 | def run(self): |
---|
51 | logging.info("Running thread to publish atom data for atom, '%s'" %self.atom.datasetID) |
---|
52 | self.edc._eXistDBClient__publishAtom(self.atom) |
---|
53 | logging.info("- finished publishing atom data") |
---|
54 | |
---|
55 | |
---|
56 | class eXistDBClient: |
---|
57 | |
---|
58 | def __init__(self, configFile = None, eXistDBHostname = None, \ |
---|
59 | eXistPortNo = '8080', \ |
---|
60 | loadCollectionData=False, setUpDB = False): |
---|
61 | ''' |
---|
62 | Initialise a connection to the eXistDB |
---|
63 | @keyword configFile: config file to use in setting up DB |
---|
64 | @keyword existDBHostname: name of eXist DB to use - if not specified, the first |
---|
65 | host in the config file is used |
---|
66 | @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults |
---|
67 | to '8080' if not set |
---|
68 | @keyword loadCollectionData: preload info on all the eXist collections, if True (default False) |
---|
69 | @keyword setUpDB: if True, create the basic collection structure and ingest the |
---|
70 | atom schemas. Default is False. |
---|
71 | ''' |
---|
72 | logging.info("Initialising connection to eXist DB") |
---|
73 | self.eXistDBHostname = eXistDBHostname |
---|
74 | self.eXistPortNo = eXistPortNo |
---|
75 | logging.debug("- connecting to DB, '%s:%s', with config file, '%s'" \ |
---|
76 | %(eXistDBHostname or 'Default', self.eXistPortNo, \ |
---|
77 | configFile or 'Default')) |
---|
78 | |
---|
79 | inputs = {} |
---|
80 | |
---|
81 | self.atomSchema = None |
---|
82 | |
---|
83 | # this keeps a record of the last file backed up - incase we need |
---|
84 | # to do a rollback if there's an error encountered |
---|
85 | self.backupName = None |
---|
86 | |
---|
87 | # NB, there are two routes through here: if a config file is specified |
---|
88 | # without a hostname, the host will be taken to be the first entry in |
---|
89 | # the config file; if a hostname is specified, it will be used explicitly |
---|
90 | if configFile: |
---|
91 | inputs['pwfile'] = configFile |
---|
92 | if not self.eXistDBHostname: |
---|
93 | self.__loadDBDetails(configFile) |
---|
94 | |
---|
95 | # Now set up the connection |
---|
96 | logging.debug(inputs) |
---|
97 | self.xmldb = DR(self.eXistDBHostname, **inputs) |
---|
98 | |
---|
99 | # set up feed client, too - NB, info should be added to the feed as it is added |
---|
100 | # to the eXist collections, so the standard dbclient and the feed client |
---|
101 | # are intrinsicly linked |
---|
102 | self.feedClient = feedClient(self.xmldb.auth, |
---|
103 | eXistDBHostname = self.eXistDBHostname, |
---|
104 | eXistPortNo = self.eXistPortNo) |
---|
105 | |
---|
106 | if setUpDB: |
---|
107 | # set up any collections required - NB, if these already exist they won't cause any files to be lost |
---|
108 | self.__setUpEXistAtomCollections() |
---|
109 | |
---|
110 | # add the schema required for atom validation |
---|
111 | self.__addAtomSchema() |
---|
112 | |
---|
113 | # add common data missing from atoms - but used to create output DIF records |
---|
114 | self.__uploadOrgData() |
---|
115 | |
---|
116 | # set up the required atom feeds |
---|
117 | self.feedClient.setupBasicFeeds() |
---|
118 | |
---|
119 | self.collections = None |
---|
120 | if loadCollectionData: |
---|
121 | self.collections = self.getAllAtomCollections() |
---|
122 | |
---|
123 | # dict to hold the data centre element info - used to augment atom info |
---|
124 | # when doing atom->DIF translations - to avoid having to constantly |
---|
125 | # look this up for the different provider IDs |
---|
126 | self.dataCentres = {} |
---|
127 | |
---|
128 | logging.info("eXist DB connection initialised") |
---|
129 | |
---|
130 | |
---|
131 | def __getSchema(self): |
---|
132 | logging.debug("Getting atom schema data") |
---|
133 | if not self.atomSchema: |
---|
134 | self.atomSchema = ec.BASE_COLLECTION_PATH + \ |
---|
135 | self.xmldb.xq.ATOM_MOLES_SCHEMA + '.xsd' |
---|
136 | |
---|
137 | return self.atomSchema |
---|
138 | |
---|
139 | AtomSchema = property(fget=__getSchema, doc="Atom schema path") |
---|
140 | |
---|
141 | |
---|
142 | def __uploadOrgData(self): |
---|
143 | ''' |
---|
144 | Upload the organisations DIF file to eXist - this is required for the atom->DIF output |
---|
145 | ''' |
---|
146 | logging.info("Uploading organisation data to eXist") |
---|
147 | schemae = [self.xmldb.xq.ATOM_SCHEMA, self.xmldb.xq.MOLES_SCHEMA, self.xmldb.xq.ATOM_MOLES_SCHEMA] |
---|
148 | for fileName, orgData in self.xmldb.xq.resources.items(): |
---|
149 | self.createEXistFile(orgData, ec.BASE_COLLECTION_PATH, fileName) |
---|
150 | logging.info("Data uploaded") |
---|
151 | |
---|
152 | |
---|
153 | def createCollections(self, collections): |
---|
154 | ''' |
---|
155 | Create the specified collections in eXist |
---|
156 | @param collections: array of collections to create |
---|
157 | @return True if successful |
---|
158 | ''' |
---|
159 | logging.info("Setting up eXist collections") |
---|
160 | for col in collections: |
---|
161 | logging.debug("Creating collection, '%s'" %col) |
---|
162 | self.xmldb.createCollection(col) |
---|
163 | logging.info("All collections set up") |
---|
164 | |
---|
165 | |
---|
166 | def getAtom(self, id): |
---|
167 | ''' |
---|
168 | Lookup the atom with id |
---|
169 | @param id: id of the atom to retrieve |
---|
170 | ''' |
---|
171 | logging.info("Looking up atom with id, '%s'" %(id)) |
---|
172 | doc = self.xmldb.get('', DR.ATOM, id, \ |
---|
173 | targetCollection = ec.BASE_COLLECTION_PATH) |
---|
174 | logging.info("Atom retrieved") |
---|
175 | return doc |
---|
176 | |
---|
177 | |
---|
178 | def checkAtomSchemaCompliance(self, atomPath, atom = None, isDebug = False): |
---|
179 | ''' |
---|
180 | Validate the specified atom in eXist with the atom schemae in eXist |
---|
181 | @param atomPath: path to the atom in eXist |
---|
182 | @keyword atom: if set to an atom, this will be created temporarily in eXist |
---|
183 | - since it may not already exist there. Once validation is completed, the |
---|
184 | file will be removed from eXist. |
---|
185 | @keyword isDebug: if True, return full error details, otherwise only return |
---|
186 | a summary |
---|
187 | @return array: containing any errors found - NB, if an empty array is returned, |
---|
188 | this indicates successful validation |
---|
189 | ''' |
---|
190 | logging.info("Validating atom, '%s' against schemata in eXist" %atomPath) |
---|
191 | try: |
---|
192 | if atom: |
---|
193 | logging.info("Creating temporary file in eXist to do validation against") |
---|
194 | fileName = atom.datasetID + str(datetime.datetime.today().microsecond) |
---|
195 | self.createEXistFile(atom.toPrettyXML(), \ |
---|
196 | atom.getDefaultCollectionPath(), fileName) |
---|
197 | atomPath = atom.getDefaultCollectionPath() + fileName |
---|
198 | |
---|
199 | validationQuery = 'validation:validate-report("' + atomPath + \ |
---|
200 | '", xs:anyURI("' + self.AtomSchema + '"))' |
---|
201 | id, result = self.xmldb.executeQuery(validationQuery) |
---|
202 | errorMessage = None |
---|
203 | if result['hits'] == 0: |
---|
204 | errorMessage = "Validation did not complete successfully - please retry" |
---|
205 | elif result['hits'] > 1: |
---|
206 | errorMessage = "More than one atom was validated - expecting only a single atom validation - please retry" |
---|
207 | except Exception, e: |
---|
208 | errorMessage = "Error encountered whilst validating atom: '%s'" %e.message |
---|
209 | |
---|
210 | if atom: |
---|
211 | logging.info("Deleting temporary file in eXist") |
---|
212 | self.deleteEXistFile(atomPath) |
---|
213 | |
---|
214 | if errorMessage: |
---|
215 | logging.error(errorMessage) |
---|
216 | raise SystemError(errorMessage) |
---|
217 | |
---|
218 | doc = self.xmldb.retrieve(id, 0) |
---|
219 | et = ET.fromstring(doc) |
---|
220 | status = et.findtext('status') |
---|
221 | |
---|
222 | # retrieve the error detail if invalid |
---|
223 | errors = [] |
---|
224 | if status == 'invalid': |
---|
225 | logging.info("Atom is invalid - details as follows:") |
---|
226 | for error in et.findall('message'): |
---|
227 | lineNo = error.attrib.get('line') |
---|
228 | colNo = error.attrib.get('column') |
---|
229 | level = error.attrib.get('level') |
---|
230 | repeat = error.attrib.get('repeat') |
---|
231 | errorText = error.text |
---|
232 | # remove the meaningless error type from message |
---|
233 | if errorText.startswith('cvc-'): |
---|
234 | errorText = ':'.join(errorText.split(':')[1:]) |
---|
235 | errorMessage = "%s at line %s, column %s: %s" %(level, lineNo, colNo, errorText) |
---|
236 | if repeat: |
---|
237 | errorMessage += " (%s times)" %repeat |
---|
238 | |
---|
239 | if isDebug: |
---|
240 | errors.append(errorMessage) |
---|
241 | else: |
---|
242 | errors.append(errorText) |
---|
243 | logging.info(errorMessage) |
---|
244 | else: |
---|
245 | logging.info("Atom is valid") |
---|
246 | |
---|
247 | logging.info("Validation complete") |
---|
248 | return errors |
---|
249 | |
---|
250 | |
---|
251 | def __setUpEXistAtomCollections(self): |
---|
252 | ''' |
---|
253 | Set up the required eXist collections needed for running the granulator script |
---|
254 | ''' |
---|
255 | logging.info("Ensuring required collections are available in eXist") |
---|
256 | for col in [ec.BASE_COLLECTION_PATH, ec.BACKUP_COLLECTION_PATH]: |
---|
257 | for type in [ec.OLD_COLLECTION_PATH, ec.PUBLISHED_COLLECTION_PATH, \ |
---|
258 | ec.SMALL_P_PUBLISHED_COLLECTION_PATH, ec.WORKING_COLLECTION_PATH]: |
---|
259 | self.xmldb.createCollection(col) |
---|
260 | self.xmldb.createCollection(col + type) |
---|
261 | self.xmldb.createCollection(col + type + ec.DE_COLLECTION_PATH) |
---|
262 | self.xmldb.createCollection(col + type + ec.DEPLOYMENT_COLLECTION_PATH) |
---|
263 | self.xmldb.createCollection(col + type + ec.DEPLOYMENTS_COLLECTION_PATH) |
---|
264 | self.xmldb.createCollection(col + type + ec.GRANULE_COLLECTION_PATH) |
---|
265 | |
---|
266 | self.xmldb.createCollection(ec.DIF_COLLECTION_PATH) |
---|
267 | self.xmldb.createCollection(ec.PROVIDER_FEED_PATH) |
---|
268 | logging.info("Required collections available") |
---|
269 | |
---|
270 | |
---|
271 | def __addAtomSchema(self): |
---|
272 | ''' |
---|
273 | Add the required atom schema to the atoms collection - to allow validation |
---|
274 | of input atoms |
---|
275 | ''' |
---|
276 | logging.info("Adding atom schema to eXist") |
---|
277 | schemata = [self.xmldb.xq.ATOM_SCHEMA, self.xmldb.xq.MOLES_SCHEMA, self.xmldb.xq.ATOM_MOLES_SCHEMA] |
---|
278 | for schema in schemata: |
---|
279 | xml = self.xmldb.xq.getSchema(schema) |
---|
280 | self.createEXistFile(xml, ec.BASE_COLLECTION_PATH, schema + '.xsd') |
---|
281 | logging.info("- schema added") |
---|
282 | |
---|
283 | |
---|
284 | def __loadDBDetails(self, configFile): |
---|
285 | ''' |
---|
286 | Retrieve info from the eXist db config file |
---|
287 | ''' |
---|
288 | logging.info("Loading DB config data") |
---|
289 | # Check this file exists |
---|
290 | if not os.path.isfile(configFile): |
---|
291 | errorMessage = "Could not find the DB config file, %s; please make sure this " \ |
---|
292 | "is available from the running directory" %configFile |
---|
293 | logging.error(errorMessage) |
---|
294 | raise ValueError(errorMessage) |
---|
295 | dbinfo_file=open(configFile, "r") |
---|
296 | dbinfo = dbinfo_file.read().split() |
---|
297 | if len(dbinfo) < 3: |
---|
298 | errorMessage = 'Incorrect data in DB config file' |
---|
299 | logging.error(errorMessage) |
---|
300 | raise ValueError(errorMessage) |
---|
301 | self.eXistDBHostname = dbinfo[0] |
---|
302 | self._username = dbinfo[1] |
---|
303 | self._pw = dbinfo[2] |
---|
304 | logging.info("DB config data loaded") |
---|
305 | |
---|
306 | |
---|
307 | def __lookupEXistFile(self, docPath): |
---|
308 | ''' |
---|
309 | Look up a file in eXist using XPath |
---|
310 | @param docPath: path to doc to look up |
---|
311 | @return: id returned from query, with which to retrieve doc; if doc doesn't exist, return None |
---|
312 | ''' |
---|
313 | logging.info("Retrieving info for file, '%s'" %docPath) |
---|
314 | |
---|
315 | id, doc = self.xmldb.executeQuery('doc("' + docPath + '")') |
---|
316 | |
---|
317 | if doc['hits'] == 0: |
---|
318 | logging.info("File does not exist in eXist DB") |
---|
319 | return None |
---|
320 | logging.info("Found file - returning result ID") |
---|
321 | return id |
---|
322 | |
---|
323 | |
---|
324 | def isNewEXistCollection(self, collectionPath): |
---|
325 | ''' |
---|
326 | Look up a collection in eXist using XPath |
---|
327 | @param collectionPath: path to collection to look up |
---|
328 | @return: False if collection exists, True otherwise |
---|
329 | ''' |
---|
330 | logging.info("Checking if collection, '%s', exists in eXist" %collectionPath) |
---|
331 | |
---|
332 | id, doc = self.xmldb.executeQuery('collection("' + collectionPath + '")') |
---|
333 | if doc['hits'] == 0: |
---|
334 | logging.info("Collection does not exist in eXist DB") |
---|
335 | return True |
---|
336 | logging.info("Found collection") |
---|
337 | return False |
---|
338 | |
---|
339 | |
---|
340 | def getEXistFile(self, docPath): |
---|
341 | ''' |
---|
342 | Use XQuery to retrieve the specified document from eXist |
---|
343 | @param docPath: the path of the doc to retrieve |
---|
344 | @return: contents of document if exists, None otherwise |
---|
345 | ''' |
---|
346 | id = self.__lookupEXistFile(docPath) |
---|
347 | |
---|
348 | if not id and id != 0: |
---|
349 | logging.info("No file found - nothing to retrieve") |
---|
350 | return None |
---|
351 | |
---|
352 | logging.info("Found file - now retrieving content") |
---|
353 | doc = self.xmldb.retrieve(id, 0) |
---|
354 | return doc |
---|
355 | |
---|
356 | |
---|
357 | def isNewEXistFile(self, docPath): |
---|
358 | ''' |
---|
359 | Test if a file already exists in eXist |
---|
360 | @param docPath: path of file in eXist to look up |
---|
361 | @return: True if a new file, False if otherwise |
---|
362 | ''' |
---|
363 | logging.info("Checking if file, '%s', exists in eXist DB" %docPath) |
---|
364 | |
---|
365 | id = self.__lookupEXistFile(docPath) |
---|
366 | |
---|
367 | if id: |
---|
368 | return False |
---|
369 | |
---|
370 | return True |
---|
371 | |
---|
372 | |
---|
373 | def __addTimeStamp(self, fileName): |
---|
374 | ''' |
---|
375 | Add timestamp to input filename |
---|
376 | NB, this assumes there is a file type identifier at the end of the filename; if so, the datestamp |
---|
377 | is included before this; if not it is just added at the end |
---|
378 | ''' |
---|
379 | bits = fileName.rsplit(".", 1) |
---|
380 | fileName = bits[0] + "_" + datetime.datetime.today().strftime("%Y-%m-%dT%H_%M_%S") |
---|
381 | |
---|
382 | if len(bits) > 1: |
---|
383 | fileName += "." + bits[1] |
---|
384 | return fileName |
---|
385 | |
---|
386 | |
---|
387 | def __removeTimeStamp(self, fileName): |
---|
388 | ''' |
---|
389 | Remove a timestamp from a file name |
---|
390 | ''' |
---|
391 | match = re.search('(.*)(_\d{4}-\d{2}-\d{2}T\d{2}_\d{2}_\d{2})(.*)', fileName) |
---|
392 | if match: |
---|
393 | return match.group(1) + match.group(3) |
---|
394 | |
---|
395 | return fileName |
---|
396 | |
---|
397 | def backupEXistFile(self, collection, fileName, runAsynch = True): |
---|
398 | ''' |
---|
399 | Backup a file that exists in the eXist DB |
---|
400 | - NB, this really just creates a new file with the same contents in a |
---|
401 | backup dir |
---|
402 | - to improve efficiency, spawn this process as a new thread since we |
---|
403 | don't need to worry about the outcome |
---|
404 | @param collection: path of the collection to store the file in |
---|
405 | @param fileName: name of file to add in eXist |
---|
406 | @param runAsynch: if True, do the backup asynchronously in a separate thread |
---|
407 | @return: path to new backup file |
---|
408 | ''' |
---|
409 | if not collection.endswith('/'): |
---|
410 | collection += '/' |
---|
411 | |
---|
412 | docPath = collection + fileName |
---|
413 | logging.info("Backing up file, '%s', in eXist DB" %docPath) |
---|
414 | |
---|
415 | logging.debug("Firstly, retrieve file contents from eXist") |
---|
416 | doc = self.getEXistFile(docPath) |
---|
417 | if not doc: |
---|
418 | errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath |
---|
419 | logging.error(errorMessage) |
---|
420 | raise SystemError(errorMessage) |
---|
421 | |
---|
422 | # Now adjust the collection to map to the backup dir |
---|
423 | collection = collection.replace(ec.BASE_COLLECTION_PATH, ec.BACKUP_COLLECTION_PATH) |
---|
424 | collection = collection.replace(ec.NDG_A_COLLECTION_PATH, ec.NDG_A_COLLECTION_PATH_BACKUP) |
---|
425 | |
---|
426 | # add timestamp to filename |
---|
427 | fileName = self.__addTimeStamp(fileName) |
---|
428 | docPath = collection + fileName |
---|
429 | |
---|
430 | if runAsynch: |
---|
431 | # run the back up in a separate thread |
---|
432 | thread = backingUpThread(self, doc, collection, fileName) |
---|
433 | thread.start() |
---|
434 | else: |
---|
435 | self.createEXistFile(doc, collection, fileName) |
---|
436 | |
---|
437 | return docPath |
---|
438 | |
---|
439 | |
---|
440 | def createEXistFile(self, xml, collection, fileName): |
---|
441 | ''' |
---|
442 | Add the input file to the eXist DB |
---|
443 | @param xml: contents of xml file to create in eXist |
---|
444 | @param collection: path of the collection to store the file in |
---|
445 | @param fileName: name of file to add in eXist |
---|
446 | @return: True, if file created successfully |
---|
447 | ''' |
---|
448 | logging.info("Adding file, '%s' to eXist DB collection, '%s'" \ |
---|
449 | %(fileName, collection)) |
---|
450 | logging.debug("data: %s" %xml) |
---|
451 | |
---|
452 | # create the collection, in case it doesn't already exist - NB, this won't overwrite anything |
---|
453 | self.createCollections([collection]) |
---|
454 | status = self.xmldb.storeXML(xml, collection + "/" + fileName, overwrite=1) |
---|
455 | if not status: |
---|
456 | errorMessage = "Command to create file in eXist did not complete successfully - exiting" |
---|
457 | logging.error(errorMessage) |
---|
458 | raise SystemError(errorMessage) |
---|
459 | |
---|
460 | logging.info("File added to eXist") |
---|
461 | return True |
---|
462 | |
---|
463 | |
---|
464 | def deleteEXistFile(self, docPath): |
---|
465 | ''' |
---|
466 | Delete the input file from eXist DB |
---|
467 | @param docPath: path of document to delete |
---|
468 | @return: True, if file deleted successfully |
---|
469 | ''' |
---|
470 | logging.info("Deleting file, '%s', from eXist DB" %docPath) |
---|
471 | |
---|
472 | status = self.xmldb.removeDoc(docPath) |
---|
473 | if not status: |
---|
474 | errorMessage = "Command to delete file in eXist did not complete successfully - exiting" |
---|
475 | logging.error(errorMessage) |
---|
476 | raise SystemError(errorMessage) |
---|
477 | |
---|
478 | logging.info("File deleted from eXist") |
---|
479 | return True |
---|
480 | |
---|
481 | |
---|
482 | def createOrUpdateEXistFile(self, xml, collection, fileName): |
---|
483 | ''' |
---|
484 | Check if a file already exists in eXist; if it does, run an |
---|
485 | update (which will backup the existing file), otherwise create |
---|
486 | the file in eXist |
---|
487 | @param xml: contents of xml file to create/update in eXist |
---|
488 | @param collection: path of the collection to store the file in |
---|
489 | @param fileName: name of file to add in eXist |
---|
490 | ''' |
---|
491 | logging.info("Creating or updating file in eXist...") |
---|
492 | if not self.isNewEXistFile(collection + fileName): |
---|
493 | self.backupEXistFile(collection, fileName) |
---|
494 | |
---|
495 | self.createEXistFile(xml, collection, fileName) |
---|
496 | |
---|
497 | |
---|
498 | def getAllAtomIDs(self): |
---|
499 | ''' |
---|
500 | Retrieve all the atom IDs in the atoms directory - NB, this can |
---|
501 | be a quick way of producing a cache of data to check - e.g. to avoid |
---|
502 | multiple calls to getAtomFileCollectionPath |
---|
503 | @return: ids - array of all atom IDs |
---|
504 | ''' |
---|
505 | logging.info("Retrieving all atom ids") |
---|
506 | xq = self.xmldb.xq.actual('atomList', '/db/atoms', '', '') |
---|
507 | id, doc = self.xmldb.executeQuery(xq) |
---|
508 | if doc['hits'] == 0: |
---|
509 | return [] |
---|
510 | |
---|
511 | indices = range(doc['hits']) |
---|
512 | |
---|
513 | doc = self.xmldb.retrieve(id, 0) |
---|
514 | et = ET.fromstring(doc) |
---|
515 | ids = [] |
---|
516 | for member in et: |
---|
517 | fn = member.findtext('{http://www.w3.org/2005/Atom}repositoryID') |
---|
518 | ids.append(fn) |
---|
519 | logging.debug("Found ids, '%s'" %ids) |
---|
520 | return ids |
---|
521 | |
---|
522 | |
---|
523 | def getAllAtomCollections(self): |
---|
524 | ''' |
---|
525 | Get all atom collection paths and store in a dictionary - for easy |
---|
526 | reference when doing lots of things at once |
---|
527 | @return: dict with key/val of atomID/collectionPath |
---|
528 | ''' |
---|
529 | logging.info("Retrieving all atom collection paths") |
---|
530 | xq = self.xmldb.xq.actual('atomList', '/db/atoms', '', '') |
---|
531 | id, doc = self.xmldb.executeQuery(xq) |
---|
532 | if doc['hits'] == 0: |
---|
533 | return [] |
---|
534 | |
---|
535 | indices = range(doc['hits']) |
---|
536 | |
---|
537 | doc = self.xmldb.retrieve(id, 0) |
---|
538 | et = ET.fromstring(doc) |
---|
539 | colData = {} |
---|
540 | for member in et: |
---|
541 | collection = member.findtext('{http://www.w3.org/2005/Atom}fileName') |
---|
542 | fileName = collection.split('/')[-1] |
---|
543 | fileName = fileName.split('.')[0] |
---|
544 | dir = '/'.join(collection.split('/')[0:-1]) |
---|
545 | colData[fileName] = dir |
---|
546 | |
---|
547 | logging.debug("Finished looking up atom paths") |
---|
548 | return colData |
---|
549 | |
---|
550 | |
---|
551 | def getAtomPublicationState(self, atomID): |
---|
552 | ''' |
---|
553 | Retrieve the publication state of the specified atom - by |
---|
554 | checking the collection it is in |
---|
555 | @param atom: atom id to look up |
---|
556 | @return: AtomState for the atom. NB, if the ID is not found, assume |
---|
557 | we're dealing with a new atom and set the state as the working state |
---|
558 | ''' |
---|
559 | logging.debug("Finding atom publication state") |
---|
560 | path = self.getAtomFileCollectionPath(atomID) |
---|
561 | for state in AtomState.allStates.values(): |
---|
562 | if path.find('/%s' %state.collectionPath) > -1: |
---|
563 | logging.debug("- state found: '%s'" %state.title) |
---|
564 | return state |
---|
565 | |
---|
566 | logging.debug("- state not found - returning WORKING state") |
---|
567 | return AtomState.WORKING_STATE |
---|
568 | |
---|
569 | |
---|
570 | def getAtomFileCollectionPath(self, atomID): |
---|
571 | ''' |
---|
572 | Given an atom id, determine and return the collection path in eXist |
---|
573 | of the associated atom file |
---|
574 | @param atom: atom id to look up |
---|
575 | @return: collection path, if it exists, None, otherwise |
---|
576 | ''' |
---|
577 | logging.info("Looking up collection path for atom ID, '%s'" %atomID) |
---|
578 | xq = self.xmldb.xq['atomFullPath'] |
---|
579 | xq = xq.replace('TargetCollection', ec.BASE_COLLECTION_PATH) |
---|
580 | xq = xq.replace('LocalID', atomID) |
---|
581 | |
---|
582 | id, doc = self.xmldb.executeQuery(xq) |
---|
583 | if doc['hits'] == 0: |
---|
584 | logging.info("No document found with the specified ID") |
---|
585 | return None |
---|
586 | |
---|
587 | doc = self.xmldb.retrieve(id,0,{}) |
---|
588 | |
---|
589 | docET = ET.fromstring(doc) |
---|
590 | collPath = docET.text + '/' |
---|
591 | logging.debug("Found collection path, '%s'" %collPath) |
---|
592 | return collPath |
---|
593 | |
---|
594 | |
---|
595 | def deleteAtomInExist(self, atom): |
---|
596 | ''' |
---|
597 | Delete the given atom from the eXist DB - using the atom |
---|
598 | details to work out the required path to delete |
---|
599 | ''' |
---|
600 | logging.info("Deleting atom from eXist") |
---|
601 | atomPath = atom.getDefaultCollectionPath() + atom.atomName |
---|
602 | self.deleteEXistFile(atomPath) |
---|
603 | logging.info("Atom deleted") |
---|
604 | |
---|
605 | |
---|
606 | def changeAtomPublicationStateInExist(self, atom, newState): |
---|
607 | ''' |
---|
608 | Adjust the publication state of an atom in eXist |
---|
609 | @param atom: the Atom data model of the atom whose publication state |
---|
610 | needs to change |
---|
611 | @param newState: an AtomState object representing the new publication |
---|
612 | state of the atom |
---|
613 | @return atom: atom data model with updated state |
---|
614 | ''' |
---|
615 | logging.info("Changing the publication state of atom - from '%s' to '%s'" \ |
---|
616 | %(atom.state.title, newState.title)) |
---|
617 | oldState = atom.state |
---|
618 | # firstly, create atom in new publication state collection - so data isn't |
---|
619 | # lost if this fails |
---|
620 | atom.state = newState |
---|
621 | self.createEXistFile(atom.toPrettyXML(), atom.getDefaultCollectionPath(), |
---|
622 | atom.atomName) |
---|
623 | |
---|
624 | # now delete atom in the old publication state |
---|
625 | atom.state = oldState |
---|
626 | self.deleteAtomInExist(atom) |
---|
627 | logging.info("- atom created in new publication state and removed from old one") |
---|
628 | atom.state = newState |
---|
629 | |
---|
630 | # update feeds + create DIFs, if needed |
---|
631 | if atom.isPublished(): |
---|
632 | self.runAsynchAtomPublish(atom) |
---|
633 | |
---|
634 | return atom |
---|
635 | |
---|
636 | |
---|
637 | |
---|
638 | def createAtomInExist(self, atom, replaceAtom = True, runAsynch = True): |
---|
639 | ''' |
---|
640 | Create an atom in the eXist DB - using the atom contents to work out |
---|
641 | the location + data set ID |
---|
642 | @param atom: atom object to create in the DB |
---|
643 | @keyword replaceAtom: if False and the atom is already available in eXist |
---|
644 | @param runAsynch: if True, if a backup of an existing file, do this |
---|
645 | asynchronously in a separate thread |
---|
646 | then raise a ValueError. |
---|
647 | ''' |
---|
648 | logging.info("Creating atom in eXist") |
---|
649 | if not atom: |
---|
650 | raise ValueError("Input is not an object - cannot create in eXist") |
---|
651 | if not isinstance(atom, Atom): |
---|
652 | raise ValueError("Input object is not an Atom object - cannot create in eXist") |
---|
653 | |
---|
654 | # if the atom has no dataset ID, generate and add one |
---|
655 | # NB, this should only be the case when the atom is being created |
---|
656 | # via the web interface |
---|
657 | isNew = False |
---|
658 | if not atom.datasetID: |
---|
659 | isNew = True |
---|
660 | atom.setDatasetID(atom.atomTypeID + '_' + str(uuid.uuid1())) |
---|
661 | |
---|
662 | eXistCollection = None |
---|
663 | if self.collections is not None: # cope with empty dict |
---|
664 | eXistCollection = self.collections.get(atom.datasetID) |
---|
665 | else: |
---|
666 | eXistCollection = self.getAtomFileCollectionPath(atom.datasetID) |
---|
667 | |
---|
668 | # if collection not found, assume we're dealing with a new atom; get its |
---|
669 | # default collection |
---|
670 | if not eXistCollection: |
---|
671 | eXistCollection = atom.getDefaultCollectionPath() |
---|
672 | |
---|
673 | # check if we need a new provider feed set up |
---|
674 | providerCollection = ec.PROVIDER_FEED_PATH + atom.ME.providerID + '/' |
---|
675 | if self.isNewEXistCollection(providerCollection): |
---|
676 | logging.info("Creating feed for new provider ID") |
---|
677 | self.createCollections([providerCollection]) |
---|
678 | self.feedClient.createAtomFeed(providerCollection, |
---|
679 | self.feedClient.PROVIDERLEVEL_ATOM_FEED_TITLE %atom.ME.providerID) |
---|
680 | |
---|
681 | elif isNew: |
---|
682 | # in this situation we're trying to create an atom with the same |
---|
683 | # name via the web interface - this can't be allowed - so retry to |
---|
684 | # generate a new ID |
---|
685 | atom.datasetID = None |
---|
686 | self.createAtomInExist(atom) |
---|
687 | return |
---|
688 | # create backup of atom if it already exists |
---|
689 | else: |
---|
690 | if not replaceAtom: |
---|
691 | raise DuplicateError('An atom with the specified ID (%s) already exists in eXist' \ |
---|
692 | %atom.datasetID) |
---|
693 | # store name of backup - to allow restore, if subsequent ops fail |
---|
694 | self.backupName = self.backupEXistFile(eXistCollection, atom.atomName, \ |
---|
695 | runAsynch = runAsynch) |
---|
696 | |
---|
697 | # also change updated date to current time |
---|
698 | atom.updatedDate = datetime.datetime.today().strftime("%Y-%m-%dT%H:%M:%SZ") |
---|
699 | |
---|
700 | self.createEXistFile(atom.toPrettyXML(), eXistCollection, atom.atomName) |
---|
701 | |
---|
702 | logging.info("Atom created in eXist") |
---|
703 | |
---|
704 | # lastly, if we're dealing with a published atom, update DIF records + |
---|
705 | # feeds |
---|
706 | if atom.isPublished(): |
---|
707 | self.runAsynchAtomPublish(atom) |
---|
708 | |
---|
709 | return atom |
---|
710 | |
---|
711 | def runAsynchAtomPublish(self, atom): |
---|
712 | thread = publishingThread(self, atom) |
---|
713 | thread.start() |
---|
714 | |
---|
715 | |
---|
716 | def __publishAtom(self, atom): |
---|
717 | ''' |
---|
718 | Add atom info to the various feeds - and if it is a data entity, use |
---|
719 | it to create a DIF document and add this to feeds, also |
---|
720 | ''' |
---|
721 | if atom.isDE(): |
---|
722 | self.createDIFDocumentFromAtom(atom) |
---|
723 | |
---|
724 | self.feedClient.addAtomToFeeds(atom) |
---|
725 | |
---|
726 | |
---|
727 | |
---|
728 | def restoreBackup(self, docPath): |
---|
729 | ''' |
---|
730 | Restore the backed up file - effectively recreating in the non-backup collection |
---|
731 | @param docPath: path to file to backup |
---|
732 | ''' |
---|
733 | logging.info("Restoring file, '%s' in eXist" %docPath) |
---|
734 | doc = self.getEXistFile(docPath) |
---|
735 | |
---|
736 | if not doc: |
---|
737 | errorMessage = "Could not retrieve file contents (%s) to backup - exiting." %docPath |
---|
738 | logging.error(errorMessage) |
---|
739 | raise SystemError(errorMessage) |
---|
740 | |
---|
741 | bits = docPath.split('/') |
---|
742 | fileName = bits[-1] |
---|
743 | collection = '/'.join(bits[0:-1]) |
---|
744 | # remove timestamp from filename |
---|
745 | fileName = self.__removeTimeStamp(fileName) |
---|
746 | |
---|
747 | # Now adjust the collection to map to the backup dir |
---|
748 | collection = collection.replace(ec.BACKUP_COLLECTION_PATH, ec.BASE_COLLECTION_PATH) |
---|
749 | collection = collection.replace(ec.NDG_A_COLLECTION_PATH_BACKUP, ec.NDG_A_COLLECTION_PATH) |
---|
750 | |
---|
751 | self.createEXistFile(doc, collection, fileName) |
---|
752 | logging.info("File restored") |
---|
753 | |
---|
754 | |
---|
755 | def transformAtomIntoDIF(self, atom): |
---|
756 | ''' |
---|
757 | Transform an atom into a DIF document - using an XQuery transform ran |
---|
758 | in eXist |
---|
759 | @param atom: the Atom data model to convert |
---|
760 | @return: the produced DIF document |
---|
761 | ''' |
---|
762 | logging.info("Creating DIF record from atom - using XQuery transform") |
---|
763 | |
---|
764 | # get the query and set this up to use properly |
---|
765 | targetCollection = atom.getPublicationStatePath() |
---|
766 | providerID = atom.ME.providerID |
---|
767 | # remove the trailing forward slash - otherwise the xquery won't work |
---|
768 | if targetCollection.endswith('/'): |
---|
769 | targetCollection = targetCollection[0:-1] |
---|
770 | xquery = self.xmldb.xq.actual('atom2DIF', targetCollection, \ |
---|
771 | providerID, atom.datasetID) |
---|
772 | |
---|
773 | discovery_id,s = self.xmldb.executeQuery(xquery) |
---|
774 | if s.has_key('hits') and not s['hits']: |
---|
775 | raise Exception("XQuery produced no results - suggesting a problem with the query") |
---|
776 | doc = self.xmldb.retrieve(discovery_id,0) |
---|
777 | |
---|
778 | # add various missing info |
---|
779 | # get the organisation data for the repository |
---|
780 | # - NB, store for re-use to avoid multiple lookups of the same info |
---|
781 | if self.dataCentres.has_key(providerID): |
---|
782 | dataCentre = self.dataCentres[providerID] |
---|
783 | else: |
---|
784 | dataCentre = getDataCentreDIFElement(providerID, self.xmldb) |
---|
785 | self.dataCentres[providerID] = dataCentre |
---|
786 | |
---|
787 | # add various other data to atoms - to make up with the incomplete data spec there |
---|
788 | doc = addOrgData(doc, targetCollection, dataCentre) |
---|
789 | doc = expandParametersData(doc) |
---|
790 | doc = addStandardKeywords(doc) |
---|
791 | |
---|
792 | logging.info("Transform completed successfully - returning DIF doc") |
---|
793 | return doc |
---|
794 | |
---|
795 | |
---|
796 | def createDIFDocumentFromAtom(self, atom): |
---|
797 | ''' |
---|
798 | Transform an atom into a DIF document and store this in eXist |
---|
799 | @param atom: the Atom data model to convert |
---|
800 | ''' |
---|
801 | logging.info("Creating and storing DIF document") |
---|
802 | doc = self.transformAtomIntoDIF(atom) |
---|
803 | fileName = atom.atomName.replace('.atom', '.dif') |
---|
804 | |
---|
805 | # now add to eXist |
---|
806 | providerID = atom.ME.providerID |
---|
807 | collectionPath = ec.DIF_COLLECTION_PATH + providerID |
---|
808 | # NB, check if provider collection exists; if not, add this with a feed |
---|
809 | # NB, the collection will be created along with the DIF file - so set the |
---|
810 | # feed up after the file has been created |
---|
811 | setupFeed = False |
---|
812 | if self.isNewEXistCollection(collectionPath): |
---|
813 | setupFeed = True |
---|
814 | |
---|
815 | self.createOrUpdateEXistFile(doc, collectionPath, |
---|
816 | fileName) |
---|
817 | if setupFeed: |
---|
818 | logging.info("Creating feed for new provider ID") |
---|
819 | self.createCollections([collectionPath]) |
---|
820 | self.feedClient.createAtomFeed(collectionPath, |
---|
821 | self.feedClient.PROVIDERLEVEL_DIF_FEED_TITLE %providerID + '/') |
---|
822 | logging.info("DIF document created and stored") |
---|
823 | |
---|