source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 6865

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@6865
Revision 6865, 29.3 KB checked in by sdonegan, 10 years ago (diff)

Upgrade a set of bug fixes to deal with authors, upgrade stats, reporting and deletion of records. Also includes update to Utilities.py for methods to provide string representation of the ISO data object doubledictionary

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24
25class AbstractDocumentIngester(object):
26        '''
27        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
28        - including running the various transforms and parsings to get all doc types and spatiotemporal
29        data in the correct form in the DB
30        '''
31        lineSeparator = "------------------------------"
32               
33        # The directory to put things for a tape backup (should already exist)
34        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
35       
36        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
37               
38        def _setupCmdLineOptions(self):
39                '''
40                Determine the logging level to use and configure this appropriately
41                @return args: any input arguments - excluding options
42                '''
43               
44                # check for verbose option
45                try:
46                        opts, args = getopt.getopt(sys.argv[1:], "vd")
47                except getopt.GetoptError, err:
48                    # print help information and exit:
49                    print str(err) # will print something like "option -a not recognized"
50                    sys.exit(2)
51                   
52                if len(args) < 1:
53                        self.usage()
54                   
55                loggingLevel = logging.WARNING
56                for o, a in opts:
57                    if o == "-v":
58                        print " - Verbose mode ON"
59                        loggingLevel = logging.INFO
60                    elif o == "-d":
61                        print " - Debug mode ON"
62                        loggingLevel = logging.DEBUG
63                   
64               
65                # set up any keywords on the object
66                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
67                checkConfInArgs = False
68                for arg in args:
69                       
70                        keywordArgs=['ingestFromDate','interval','individualFile','interval','ingestFromDate','eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile']                                             
71                       
72                        bits = arg.split('=')
73                        if len(bits) == 2:
74                                if bits[0] == keywordArgs[0]:
75                                        self.setIngestFromDate(bits[1])
76                                elif bits[0] == keywordArgs[1]:
77                                        self.setPollInterval(bits[1])
78                                elif bits[0] == keywordArgs[8]:
79                                        print " - Running with specified config file!"
80                                        self.setOaiConfigFile(bits[1])
81                                        checkConfInArgs = True
82                                elif bits[0] == keywordArgs[2]:
83                                        print " - Running in single file ingestion mode!"
84                                        self.setIndFileToIngest(bits[1])
85                                elif bits[0] not in keywordArgs:
86                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
87                                        sys.exit(2)
88                                else:
89                                        setattr(self, bits[0], bits[1])
90                       
91                if checkConfInArgs is False:
92                        print "\nWARNING: You must now include the ingestConfigFile arguement for successful operation"
93                        sys.exit(2)
94               
95                print self.lineSeparator
96               
97                # NB, this is a slight fudge as cannot get the detailed logging to work
98                # without setting up a new logger here - which means we get two loggers
99                # outputing data. The initial call to logging needs to be tracked down
100                # and configured correctly, so this can be avoided...
101#               self.logger = logging.getLogger()
102#               self.logger.setLevel(loggingLevel)
103
104                # create console handler and set level to debug
105#               ch = logging.StreamHandler()
106#               ch.setLevel(loggingLevel)
107                # create formatter
108#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
109                # add formatter to ch
110#               ch.setFormatter(formatter)
111                # add ch to logger
112#               self.logger.addHandler(ch)
113               
114                return args
115
116
117        def getID(self, filename):
118                '''
119                Gets the identifier out of an input metadata xml record.
120                @param filename - name of document file being processed
121                @return: ID - id to use to refer to the document
122                '''
123                logging.info("Retrieving identifier for metadata record " + filename)
124       
125                xml=file(filename).read()
126               
127                ID = idget(xml)
128               
129                return ID
130       
131       
132        def addFileToPostgresDB(self, filename):
133                '''
134                Add a file to the postgres DB - extracting and storing all the required
135                data in the process
136                @param filename: full path of file to add to postgres DB
137                '''
138                logging.info("Adding file, " + filename + ", to postgres DB")
139               
140                numSlash = len(filename.split('/'))
141                               
142                shortFilename = filename.split('/')[numSlash - 1]
143               
144                self.recordAttemptToIngestDiscoveryID = ""
145               
146                #need to generate a local version of the iso data model (first call was to run through all available files
147                #and convert & place into relevant dirs as necessary
148                #generate the ISO object from the converted ISO here now..
149               
150                try:
151                                                                                               
152                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
153                               
154                        if self.isoDataModel.createISOdataStructure() is True:
155                                logging.info("ISO extractor worked fine!")
156                       
157                       
158                        elif self.isoDataModel.createISOdataStructure() is True:
159                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
160                                                #sys.exit() # TODO fix these
161                                return
162                        else:
163                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
164                                                #sys.exit()
165                                return
166                except:
167                        logging.error("Could not generate ISO XML!")
168                        return
169               
170                #get the converted ISO into a local var
171                try:
172                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
173                        self.isoXML = file(filename).read()
174                        self.originalXMLdoc = self.isoXML
175                                       
176                except:
177                        logging.warn("Could not extract converted ISO xml to local variable!")
178                        self.isoXML = ''
179                               
180               
181                # first of all create a PostgresRecord - this object represents all the data required
182                # for a DB entry
183                dao = None
184               
185                try:
186                       
187                        #record whats bout to be ingestd                       
188                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[0][0]
189                                               
190                        #In new ingest system all vals parsed from iso xml are returned as either lists of dictionaries
191                       
192                        #record whats attempting to be ingested
193                        record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
194                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
195                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, self._currentMedinStandard, stubIso = self.isoXML)                       
196                       
197                        # Now create the data access object to interface to the DB                     
198                        dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc )
199                       
200                        # Finally, write the new record
201                        # 0 if failed, 1 if update, 2 if create
202                        returnCode = dao.createOrUpdateRecord() 
203                                               
204                        if returnCode == 2:
205                                self._no_files_ingested += 1
206                               
207                        elif returnCode == 1:
208                                self._no_files_changed += 1
209                               
210                       
211                except:
212                       
213                       
214                        #if error encountered, add to failure lisr
215                        logging.error("Could not update: " + filename)                 
216                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
217                        self.updateFailList.append(originalRecordFilename)
218                       
219                        logging.error("Exception thrown - detail: ")
220                        errors = sys.exc_info()
221                        logging.error(errors)
222                        self._error_messages += "%s\n" %str(errors[1])
223                       
224                        if dao:
225                                logging.info("Removing record and its associated info from DB")
226                                logging.info("- to allow clean ingestion on rerun")
227                                try:
228                                        dao.deleteOriginalRecord()
229                                except:
230                                        logging.error("Problem encountered when removing record: ")
231                                        logging.error(sys.exc_info())
232                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
233
234                        self._no_problem_files += 1
235                       
236                        logging.info("Continue processing other files")
237                       
238                return self.recordAttemptToIngestDiscoveryID
239                       
240       
241        def getProcessingConfig(self,configFilePath):
242               
243                '''
244                Fed up with messing about with hardcoded directory paths. 
245                This method to get relevant values out of the oai_document_ingester.config file
246               
247                Returns a dictionary of values with keys:
248               
249                #directory in which the code resides
250                code_directory /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/
251
252                #base directory in which metadata is extracted to and converted to
253                base_directory /home/badc/discovery_docs/ingestDocs/
254
255                #directory in which to write reports to
256                reporting_directory /home/badc/discovery_docs/ingestDocs/data/
257
258                #path to the passwords file
259                passwords_file /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/passwords.txt
260
261                #datacentre config file directory path
262                datcentre_configs /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/
263                '''
264               
265                # Check this file exists; if not, assume an invalid datacentre has been specified
266               
267                if not os.path.isfile(configFilePath):
268                    sys.exit("ERROR: Could not find the processing config file")
269                   
270                processingConfig = {}
271                   
272                processing_config_file = open(configFilePath, "r")
273               
274                for line in processing_config_file.readlines():
275                        words  = string.split(line)
276                        if len(words) == 0:
277                                continue
278                        elif words[0] == 'code_directory':
279                                processingConfig['code_directory'] = words[1]
280                                                                                               
281                        elif words[0] == 'base_directory':
282                                processingConfig['base_directory'] = words[1]
283                               
284                        elif words[0] == 'reporting_directory':
285                                processingConfig['reporting_directory'] = words[1]
286                               
287                        elif words[0] == 'passwords_file':
288                                processingConfig['passwords_file'] = words[1]
289                               
290                        elif words[0] == 'datcentre_configs':
291                                processingConfig['datcentre_configs'] = words[1]
292                                       
293                        elif words[0] == 'NDG_redirect_URL':
294                                processingConfig['NDG_redirect_URL'] = words[1]
295                               
296                        elif words[0] == 'ingestConfig':
297                                if not os.path.isfile(words[1]):
298                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
299                                else:
300                                        processingConfig['ingestConfig'] = words[1]
301                               
302                        elif words[0] == 'acceptedFormats':
303                                processingConfig['acceptedFormats'] = words[1]
304                               
305                        elif words[0] == 'saxonJarFile':
306                                if not os.path.isfile(words[1]):
307                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
308                                else:
309                                        processingConfig['saxonJarFile'] = words[1]
310                        elif words[0] == 'xpathIsoClass':
311                               
312                                #build a dictionary with format as the key                             
313                                isoClass = words[1].split(":")                         
314                               
315                                if 'xpathIsoClass' not in processingConfig.keys():
316                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
317                                else:
318                                        temp = processingConfig['xpathIsoClass']                                       
319                                        temp.update({isoClass[0]:isoClass[1]})
320                                        processingConfig['xpathIsoClass'] = temp
321                               
322                        elif words[0] == 'xqueryStubIsoGen':
323                               
324                                #build a dictionary with format as the key
325                                isoClass = words[1].split(":")
326                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
327                               
328                        elif words[0] == 'xqueryConversions':
329                               
330                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
331                                processingConfig['xqueryConversions'] = words[1].split(",")
332                               
333                        elif words[0] == 'currentMEDIN':
334                               
335                                #define the current MEDIN standard so can cast conversions as this (at some stage we will ingest a few different versions)
336                                processingConfig['currentMEDIN'] = words[1]
337                               
338                        elif words[0] == 'exceptXqConv':
339                               
340                                #build a dictionary with format as the key
341                                xqClass = words[1].split(":")
342                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
343                               
344                        elif words[0] == 'xqDocTypes':
345                               
346                                #build a dictionary with format as the key
347                                xqClass = words[1].split(":")
348                               
349                                if 'xqDocTypes' not in processingConfig.keys():
350                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
351                                else:
352                                        temp = processingConfig['xqDocTypes']                                   
353                                        temp.update({xqClass[0]:xqClass[1]})
354                                        processingConfig['xqDocTypes'] = temp
355                                       
356                processing_config_file.close()
357                       
358                return processingConfig
359       
360       
361        def getConfigDetails(self, datacentre):
362                '''
363                Get the harvested records directory and groups for this datacentre from the
364                datacentre specific config file.  The harvested records directory depends on the
365                datacentres OAI base url, the set and format. These have to be know up-front.
366                The groups denote which 'portal groups' they belong to - for limiting searches to
367                say NERC-only datacentres records.
368                Groups are added to the intermediate MOLES when it is created.
369                @param datacentre: datacentre to use when looking up config file
370                '''
371                # initialise the variables to retrieve from the config file
372                self._harvest_home = ""
373                self._datacentre_groups = ""
374                self._datacentre_format = ""
375                self._datacentre_namespace = ""
376                self._NDG_dataProvider = False
377                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
378               
379               
380                #note changed to production buildout path
381                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
382               
383                #fed up with this rubbish causing problems - use a normal file opener.
384                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
385               
386               
387                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
388                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
389               
390                # Check this file exists; if not, assume an invalid datacentre has been specified
391                if not os.path.isfile(self._datacentre_config_filename):
392                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
393                        "specified (%s) is invalid\n" %datacentre)
394                   
395               
396                datacentre_config_file = open(self._datacentre_config_filename, "r")
397               
398                for line in datacentre_config_file.readlines():
399                        words  = string.split(line)
400                        if len(words) == 0:
401                                continue
402                        elif words[0] == 'host_path':
403                                self._harvest_home = words[1].rstrip()
404                        elif words[0] == 'groups':
405                                self._datacentre_groups = words[1:]
406                        elif words[0] == 'format':
407                                self._datacentre_format = words[1]
408                        elif words[0] == 'namespace':
409                                self._datacentre_namespace = words[1]
410                        elif words[0] == 'NDG_dataProvider':
411                                self._NDG_dataProvider = True
412                        elif words[0] == 'feed':
413                                self._IngestViaFeed = True
414                       
415                datacentre_config_file.close()
416               
417                #stop ingest on this datacentre if process thread is OAI and the config says FEED
418                if ((self.processThread == 'OAI') and self._IngestViaFeed):
419                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
420                                               
421                if self._harvest_home == "":
422                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
423               
424                logging.info("harvested records are in " + self._harvest_home)
425               
426                if self._datacentre_groups == "":
427                    logging.info("No groups/keywords set for datacentre " + datacentre)
428                else:
429                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
430               
431                if self._datacentre_format == "":
432                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
433               
434                logging.info("format being harvested: " + self._datacentre_format)
435               
436                if self._datacentre_namespace == "":
437                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
438               
439                logging.info("datacentre namespace: " + self._datacentre_namespace)
440               
441                if self._NDG_dataProvider:
442                        logging.info("Datacentre classified as an NDG data provider")
443                else:
444                        logging.info("Datacentre is not classified as an NDG data provider")
445                logging.info(self.lineSeparator)
446               
447               
448        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
449                '''
450                Processes/renames the files (changed 08/01/07 to get id from inside file)
451                 - also replace any namespace declarations with a standard one which we know works in NDG
452                 NB, this copies files from the original dir to the discovery dir
453                 @param originals_dir: directory to convert files from
454                 @param discovery_dir: directory in which converted files will end up
455                 @return numfilesproc: counter of number of files processed
456                '''
457                numfilesproc = 0                       
458               
459                self.inputFileOrigFinal = {}
460               
461                logging.info(self.lineSeparator)
462                logging.info("Renaming files:")
463               
464                for filename in os.listdir(originals_dir):
465                       
466                        if not filename.endswith('.xml'):
467                                logging.warning('File %s is not xml format. Not processed'  %(filename))
468                                continue
469                       
470                        original_filename = originals_dir + filename
471                       
472                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
473                        if format == 'DIF_9.4':
474                                                       
475                                logging.info("Converting DIF to stub ISO...")                                   
476                               
477                                #if DIF still need to convert URL's using the old method so when requesting DIF in original format we still have correct redirect urls....
478                                #redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self._ndgRedirectURL)
479                                                                                                               
480                                #gets the metadata id from the xml
481                                metadataID=self.getID(original_filename)
482                                #import pdb
483                                #pdb.set_trace()
484                                                               
485                                repositoryName = self._datacentre_namespace
486                               
487                                #get the name of the file to be used in the xquery
488                                metadataFilename = filename.replace('.xml', '')
489                                                               
490                                #where is the file to be ingested located?
491                                metadataFileLoc = originals_dir
492                               
493                                #generate a new stubISO filename
494                                self.isoFormat= "stubISO"
495                                                               
496                               
497                                #self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename)
498                                self.xqueryTransformation = xqueryTransformation(metadataFileLoc,repositoryName,metadataID,metadataFilename,self._saxonJarFile)
499                                                                                                                       
500                                #get hold of the xml for the transformed ISO                           
501                                self.isoXML = self.xqueryTransformation.runXquery(self._dif2isoXquery)
502                                                               
503                                #create the iso file on the system
504                                tempStubISOfile = original_filename + ".stubISO.xml"
505                                FileUtilities.createFile(tempStubISOfile , self.isoXML)
506                               
507                                #generate the ISO object from the converted ISO here now..
508                                try:
509                                                                                               
510                                        isoDataModel = ExtractISO(tempStubISOfile,self._isoClass[self._datacentre_format])
511                               
512                                        if isoDataModel.createISOdataStructure() is True:
513                                                logging.info("ISO extractor worked fine!")
514                       
515                       
516                                        elif isoDataModel.createISOdataStructure() is True:
517                                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
518                                                #sys.exit() # TODO fix these
519                                                return
520                                        else:
521                                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
522                                                #sys.exit()
523                                                return
524                                except:
525                                        logging.error("Could not generate ISO XML!")
526                                        return
527                               
528                                                               
529                                #need to get hold original format XML too (so can put directly into "transformed" docs section...
530                                self.originalXMLdoc = ''
531                               
532                                try:
533                                        logging.info("Extracting original input file into variable for input into original format in transformed docs table")
534                                        self.originalXMLdoc = file(original_filename).read()
535                                       
536                                except:
537                                        logging.warn("Could not extract original file to local variable!")
538                               
539                               
540                                #Back to main stream of ISO ingest
541                                if self._NDG_dataProvider:
542                                       
543                                        #to ensure "stubISO" doesnt make it into the filename on system and in the db, use the self.
544                                                               
545                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
546                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + format + "__"+ metadataFilename.replace(":", "-")+".xml"
547                                        new_filename_short = self._datacentre_namespace + "__" + format + "__" + metadataFilename.replace(":", "-")+".xml"
548                               
549                                       
550                                else:
551                                        ident = isoDataModel.datasetID[0][0]
552                                       
553                                        ident = ident.replace(":", "-")
554                                        ident = ident.replace("/", "-")
555                               
556                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
557                                        new_filename_short = self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
558                               
559                                       
560                               
561                                #now create this stub ISO file on system so can access it
562                                self.genIsoFile = discovery_dir + new_filename_short   
563                                                               
564                                #FileUtilities.createFile(self.stubIsoFile, self.isoXML)
565                                #generate the redirect urls - now we have worked out the full discovery ndg name..
566                               
567                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile)
568                               
569                       
570                       
571                        #all other ISO formats/profiles to deal with.
572                        else:                           
573                                isoFormat = format
574                               
575                                self.originalXMLdoc = "null"
576                               
577                               
578                                #generate the ISO object from the ISO xml..                                                                             
579                                isoDataModel = ExtractISO(original_filename,self._isoClass[self._datacentre_format])
580                               
581                                if isoDataModel.createISOdataStructure() is True:
582                                                logging.info("ISO extractor worked fine!")
583                       
584                                elif isoDataModel.createISOdataStructure() is True:
585                                        logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
586                                                #sys.exit() # TODO fix these
587                                        return
588                                else:
589                                        logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
590                                                #sys.exit()
591                                        return
592                               
593                                #Back to main stream of ISO ingest
594                                if self._NDG_dataProvider:
595                                                               
596                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
597                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
598                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
599                               
600                                else:
601                                       
602                                        ident = isoDataModel.datasetID[0][0]
603                               
604                                        ident = ident.replace(":", "-")
605                                        ident = ident.replace("/", "-")
606                               
607                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
608                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
609                               
610                                #now create this stub ISO file on system so can access it
611                                self.genIsoFile = discovery_dir + new_filename_short   
612                               
613                                #generate the redirect urls - now we have worked out the full discovery ndg name..
614                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile)
615                               
616                                #get the converted ISO into a local var
617                                try:
618                                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
619                                        self.isoXML = file(self.genIsoFile).read()
620                                        self.originalXMLdoc = self.isoXML
621                                       
622                                except:
623                                        logging.warn("Could not extract converted ISO xml to local variable!")
624                                        self.isoXML = ''
625                                       
626                        logging.info("original file = " + original_filename)
627                        logging.info("newfile = " + new_filename)
628                               
629                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
630                        #this links a derived filename in processing dir with original filename
631                        #get basic filename from the path+filename passed to this function to use as key               
632                        self.inputFileOrigFinal[new_filename_short]=filename
633                       
634                        '''
635                        TODO: do we need this namespace corrector anymore as not using MOLES?
636               
637                        # now correct any namespace issues
638                        try:
639                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
640                        except Exception, detail:
641                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
642                                logging.error("Detail: %s" %detail)
643                                logging.info("Continue with next file")
644                                continue
645                        '''
646                        numfilesproc += 1
647               
648                       
649                logging.info("File renaming and converting completed")
650                logging.info(self.lineSeparator)
651               
652                       
653                return numfilesproc
654       
655       
656        def _getPostgresDBConnection(self):
657                '''
658                Get the default postgres DB connection - by reading in data from the db config file
659                '''
660               
661                logging.debug("Setting up connection to postgres DB")
662                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
663               
664               
665                self.pgc = pgc(configFile = self._databaseConfigurationFile)
666                logging.info("Postgres DB connection now set up")
667
668
669
670        def     _backupAndCleanData(self):
671                '''
672                Back up ingested data for specified data centre, then clean any of
673                the ingest dirs
674                '''
675                logging.info("Backing up ingested data")
676                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
677                  strftime("%y%m%d_%H%M") + "_originals/"
678               
679                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
680                logging.info("Data backed up - now clearing ingest directories")
681                #Clear out the original harvest records area and discovery dir
682                #FileUtilities.cleanDir(self.originals_dir)
683                #FileUtilities.cleanDir(self.discovery_dir)
684                logging.info("Ingest directories cleared")
685
686
687        def     _setupDataCentreDirs(self):
688                '''
689                Set up directories appropriate for the current data centre
690                '''
691                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
692               
693               
694                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
695               
696                # the following dirs define where the specific documents should go
697                self.originals_dir = data_dir + "/oai/originals/" # copy of original
698                self.discovery_dir = data_dir + "/discovery/"
699                #self.stubISO_dir = data_dir + "/stub_iso/"
700               
701                # Create/clear the 'in' and 'out' directories
702                FileUtilities.setUpDir(self.originals_dir)
703                FileUtilities.setUpDir(self.discovery_dir)
704               
705                logging.info("Ingest directories for data centre set up")
706
707
708        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
709                '''
710                Convert files from originals dir to discovery one, then
711                ingest, backup and clean
712                @param originals_dir: directory to ingest docs from
713                @param discovery_dir: directory to use to process ingested docs
714                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
715                '''
716               
717               
718                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
719               
720                filenames = os.listdir(self.discovery_dir)
721                               
722                #generate a list of files ALREADY in database so can compare with what has been ingested
723                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
724                               
725                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
726                filesPresentList=[]
727       
728               
729                if filePresentListArr:
730                        for fileArr in filePresentListArr:
731                                filesPresentList.append(fileArr[0])
732                               
733                                #TODO - is above relevant - duplicating functionlaity?
734       
735                #create list to to hold files ingest failed on.
736                self.updateFailList = []
737                self.deletedFailList = []                               
738               
739                for filename in filenames:
740                       
741                        fullPath = self.discovery_dir + filename
742                       
743                        if os.path.isfile(fullPath):
744                                       
745                                thisIngestedID = self.addFileToPostgresDB(fullPath)
746                               
747                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
748                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
749                                        if thisIngestedID in filesPresentList: 
750                                                filesPresentList.remove(thisIngestedID)                 
751                               
752                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
753                #will need to be removed.
754               
755                #only do this if not in single file mode (or else wverything else gets deleted!)               
756                if ((self.indFileToIngest == "") & (feed)):
757                        for item in filesPresentList:
758                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
759                                DeleteRecord(item)
760                                self.deletedFailList.append(item)
761                                self._no_files_deleted += 1
762                       
763                       
764                self._backupAndCleanData()
765               
766                #at this stage put the reporting code in (will work for oai or atom feed)
767                #create a summary file for each data centre ingest
768                data_dir = self._base_dir + "data/"
769                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
770                recOpFile = open(recOpFileName,'w')
771               
772                logging.info("oai_document_ingest processing complete:")
773               
774                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
775                message = "Ingest report for data centre: " + datacentre + "\n"
776                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
777                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
778                message = message + "TOTAL_PROCESSED " + str(numfilesproc) + "\n"
779                message = message + "INGESTED_Created " + str(self._no_files_ingested)  + "\n"
780                message = message + "INGESTED_Updated " + str(self._no_files_changed)  + "\n"
781                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
782                message = message + "PROBLEM_FILES " + str(self._no_problem_files)  + "\n"
783               
784               
785                for badFile in self.updateFailList:
786                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")     
787                        message = message +"PROBLEM_FILE " + badFile + "\n"
788                         
789                recOpFile.write(message)
790                               
791                return numfilesproc, message
792
793
794
795        def _setupXQueries(self):
796                '''
797                Set up the required XQueries
798                - NB, extract the xquery libraries locally for easy reference
799                '''
800                self._xq = ndgResources()
801                for libFile in self._xq.xqlib:
802                        # NB, we don't want the full path to the files - just the filename
803                        fileName = libFile.split('/')[-1]
804                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.