source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 7759

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@7759
Revision 7759, 37.4 KB checked in by sdonegan, 10 years ago (diff)

Copys original metadata file into holding area so geonetworks can ingest and publish on CSW - only if successful ingest

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24
25class AbstractDocumentIngester(object):
26        '''
27        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
28        - including running the various transforms and parsings to get all doc types and spatiotemporal
29        data in the correct form in the DB
30        '''
31        lineSeparator = "------------------------------"
32               
33        # The directory to put things for a tape backup (should already exist)
34        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
35       
36        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
37               
38        def _setupCmdLineOptions(self):
39                '''
40                Determine the logging level to use and configure this appropriately
41                @return args: any input arguments - excluding options
42                '''
43               
44                # check for verbose option
45                try:
46                        opts, args = getopt.getopt(sys.argv[1:], "vd")
47                except getopt.GetoptError, err:
48                    # print help information and exit:
49                    print str(err) # will print something like "option -a not recognized"
50                    sys.exit(2)
51                   
52                if len(args) < 1:
53                        self.usage()
54                   
55                loggingLevel = logging.ERROR
56                for o, a in opts:
57                    if o == "-v":
58                        print " - Verbose mode ON"
59                        loggingLevel = logging.INFO
60                    elif o == "-d":
61                        print " - Debug mode ON"
62                        loggingLevel = logging.DEBUG
63                   
64               
65                # set up any keywords on the object
66                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
67                checkConfInArgs = False         
68                checkProcID = False
69                for arg in args:
70                       
71                        keywordArgs=['ingestFromDate','individualFile','interval','ingestFromDate',\
72                                                'eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile', \
73                                                'ingestProcessID']
74                                               
75                       
76                        bits = arg.split('=')
77                        if len(bits) == 2:
78                                if bits[0] == keywordArgs[3]:
79                                        self.setIngestFromDate(bits[1])
80                                elif bits[0] == keywordArgs[2]:
81                                        self.setPollInterval(bits[1])
82                                elif bits[0] == keywordArgs[7]:
83                                        print " - Running with specified config file!"
84                                        self.setOaiConfigFile(bits[1])
85                                        checkConfInArgs = True
86                                elif bits[0] == keywordArgs[1]:
87                                        print " - Running in single file ingestion mode!"
88                                        self.setIndFileToIngest(bits[1])
89                                elif bits[0] == keywordArgs[8]:
90                                        print " - WARNING: will log results of this ingestion process into the specified logging database! (NOTE: will also take location of metadata from database as this option intended for DPWS!!)"
91                                        self.setIngestLogID(bits[1])
92                                        checkProcID = True                             
93                                elif bits[0] not in keywordArgs:
94                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
95                                        sys.exit(2)
96                                else:
97                                        setattr(self, bits[0], bits[1])
98                       
99                if checkConfInArgs is False:
100                        print "\nWARNING: Ingest config file has not been specified. Using local default"
101                        self.setOaiConfigFile('./oai_document_ingester.config')
102                       
103               
104               
105                print self.lineSeparator
106               
107                # NB, this is a slight fudge as cannot get the detailed logging to work
108                # without setting up a new logger here - which means we get two loggers
109                # outputing data. The initial call to logging needs to be tracked down
110                # and configured correctly, so this can be avoided...
111#               self.logger = logging.getLogger()
112#               self.logger.setLevel(loggingLevel)
113
114                # create console handler and set level to debug
115#               ch = logging.StreamHandler()
116#               ch.setLevel(loggingLevel)
117                # create formatter
118#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
119                # add formatter to ch
120#               ch.setFormatter(formatter)
121                # add ch to logger
122#               self.logger.addHandler(ch)
123               
124                return args
125
126
127        def getID(self, filename):
128                '''
129                Gets the identifier out of an input metadata xml record.
130                @param filename - name of document file being processed
131                @return: ID - id to use to refer to the document
132                '''
133                logging.info("Retrieving identifier for metadata record " + filename)
134       
135                xml=file(filename).read()
136               
137                ID = idget(xml)
138               
139                return ID
140       
141       
142        def addFileToPostgresDB(self, filename):
143                '''
144                Add a file to the postgres DB - extracting and storing all the required
145                data in the process
146                @param filename: full path of file to add to postgres DB
147                '''
148                logging.info("Adding file, " + filename + ", to postgres DB")
149               
150                numSlash = len(filename.split('/'))
151                               
152                shortFilename = filename.split('/')[numSlash - 1]
153               
154                self.recordAttemptToIngestDiscoveryID = ""
155                self.ingestWarningMessage = ""
156               
157                self.ingestProblemMessage = ""
158               
159                #need to generate a local version of the iso data model (first call was to run through all available files
160                #and convert & place into relevant dirs as necessary
161                #generate the ISO object from the converted ISO here now..
162               
163                try:
164                                                                                                                                       
165                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
166                       
167                                                       
168                        if self.isoDataModel.createISOdataStructure() is True:
169                                logging.info("ISO extractor worked fine!")
170                       
171                       
172                        elif self.isoDataModel.createISOdataStructure() is True:
173                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
174                                                #sys.exit() # TODO fix these
175                                return
176                        else:
177                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
178                                                #sys.exit()
179                                return
180                except:
181                        logging.error("Could not generate ISO XML!")
182                        return
183               
184                #get the converted ISO into a local var
185                try:
186                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
187                        self.isoXML = file(filename).read()
188                        self.originalXMLdoc = self.isoXML
189                                       
190                except:
191                        logging.warn("Could not extract converted ISO xml to local variable!")
192                        self.isoXML = ''
193                               
194               
195                # first of all create a PostgresRecord - this object represents all the data required
196                # for a DB entry
197                dao = None
198               
199                try:
200                       
201                        #if record is valid (vital elements checked in ExtractISO - validDoc is True if everything ok..
202                       
203                        #record whats bout to be ingested - if a problem this, error should already have been kicked out
204                        mappingIndex = self.isoDataModel.findTheListData(self.isoDataModel.datasetID)                                   
205                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[mappingIndex][0]
206                       
207                       
208                        if self.isoDataModel.validDoc is True:                 
209                               
210                               
211                                #record whats attempting to be ingested
212                                record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
213                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
214                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, self._currentMedinStandard, stubIso = self.isoXML, difXML = self.difXML, isoXML = self.isoXML)                   
215                               
216                                # Now create the data access object to interface to the DB                     
217                                dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc, dpwsID = self._DPWS_ID )
218                       
219                                # Finally, write the new record
220                                # 0 if failed, 1 if update, 2 if create                         
221                                returnCode = dao.createOrUpdateRecord()
222                                                               
223                                if dao.processingWarning != '':
224                                        self.ingestWarningMessage = dao.processingWarning
225                                                                               
226                                if returnCode == 2:
227                                        self._no_files_ingested += 1
228                               
229                                elif returnCode == 1:
230                                        self._no_files_changed += 1
231                                       
232                        else:
233                                self.ingestProblemMessage= "Content validity problems for record: %s (%s)" %(self.recordAttemptToIngestDiscoveryID,self.isoDataModel.processingMsg)
234                               
235                                logging.warn(self.ingestProblemMessage)
236                               
237                                #now raise error so can record this
238                                raise ValueError,self.ingestProblemMessage
239                       
240                except:
241                       
242                       
243                        #if error encountered, add to failure lisr
244                        logging.error("Could not update: " + filename)                 
245                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
246                        self.updateFailList.append(originalRecordFilename)
247                       
248                        logging.error("Exception thrown - detail: ")
249                        errors = sys.exc_info()
250                        logging.error(errors)
251                        self._error_messages += "%s\n" %str(errors[1])
252                       
253                        if dao:
254                                logging.info("Removing record and its associated info from DB")
255                                logging.info("- to allow clean ingestion on rerun")
256                                try:
257                                        dao.deleteOriginalRecord()
258                                except:
259                                        logging.error("Problem encountered when removing record: ")
260                                        logging.error(sys.exc_info())
261                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
262
263                        self._no_problem_files += 1
264                       
265                        logging.info("Continue processing other files")
266                       
267                return self.recordAttemptToIngestDiscoveryID
268                       
269       
270        def getProcessingConfig(self,configFilePath):
271               
272                '''
273                Fed up with messing about with hardcoded directory paths. 
274                This method to get relevant values out of the oai_document_ingester.config file
275               
276                Returns a dictionary of values with keys:
277               
278                #directory in which the code resides
279                code_directory /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/
280
281                #base directory in which metadata is extracted to and converted to
282                base_directory /home/badc/discovery_docs/ingestDocs/
283
284                #directory in which to write reports to
285                reporting_directory /home/badc/discovery_docs/ingestDocs/data/
286
287                #path to the passwords file
288                passwords_file /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/passwords.txt
289
290                #datacentre config file directory path
291                datcentre_configs /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/
292                '''
293               
294                # Check this file exists; if not, assume an invalid datacentre has been specified
295               
296                if not os.path.isfile(configFilePath):
297                    sys.exit("ERROR: Could not find the processing config file")
298                   
299                processingConfig = {}
300                   
301                processing_config_file = open(configFilePath, "r")
302               
303                for line in processing_config_file.readlines():
304                        words  = string.split(line)
305                        if len(words) == 0:
306                                continue
307                        elif words[0] == 'code_directory':
308                                processingConfig['code_directory'] = words[1]
309                                                                                               
310                        elif words[0] == 'base_directory':
311                                processingConfig['base_directory'] = words[1]
312                               
313                        elif words[0] == 'reporting_directory':
314                                processingConfig['reporting_directory'] = words[1]
315                               
316                        elif words[0] == 'passwords_file':
317                                processingConfig['passwords_file'] = words[1]
318                               
319                        elif words[0] == 'datcentre_configs':
320                                processingConfig['datcentre_configs'] = words[1]
321                                       
322                        elif words[0] == 'changeURLs':
323                                processingConfig['changeURLs'] = words[1]
324                               
325                        elif words[0] == 'NDG_redirect_URL':
326                                processingConfig['NDG_redirect_URL'] = words[1]
327                               
328                        elif words[0] == 'ingestLoggingConfig':
329                                processingConfig['ingestLoggingConfig'] = words[1]
330                               
331                        elif words[0] == 'ingestConfig':
332                                if not os.path.isfile(words[1]):
333                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
334                                else:
335                                        processingConfig['ingestConfig'] = words[1]
336                               
337                        elif words[0] == 'acceptedFormats':
338                                processingConfig['acceptedFormats'] = words[1]
339                               
340                        elif words[0] == 'saxonJarFile':
341                                if not os.path.isfile(words[1]):
342                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
343                                else:
344                                        processingConfig['saxonJarFile'] = words[1]
345                        elif words[0] == 'xpathIsoClass':
346                               
347                                #build a dictionary with format as the key                             
348                                isoClass = words[1].split(":")                         
349                               
350                                if 'xpathIsoClass' not in processingConfig.keys():
351                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
352                                else:
353                                        temp = processingConfig['xpathIsoClass']                                       
354                                        temp.update({isoClass[0]:isoClass[1]})
355                                        processingConfig['xpathIsoClass'] = temp
356                               
357                        elif words[0] == 'xqueryStubIsoGen':
358                               
359                                #build a dictionary with format as the key
360                                isoClass = words[1].split(":")
361                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
362                               
363                        elif words[0] == 'xqueryConversions':
364                               
365                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
366                                processingConfig['xqueryConversions'] = words[1].split(",")
367                               
368                        elif words[0] == 'currentMEDIN':
369                               
370                                #define the current MEDIN standard so can cast conversions as this (at some stage we will ingest a few different versions)
371                                processingConfig['currentMEDIN'] = words[1]
372                               
373                        elif words[0] == 'exceptXqConv':
374                               
375                                #build a dictionary with format as the key
376                                xqClass = words[1].split(":")
377                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
378                               
379                        elif words[0] == 'xqDocTypes':
380                               
381                                #build a dictionary with format as the key
382                                xqClass = words[1].split(":")
383                               
384                                if 'xqDocTypes' not in processingConfig.keys():
385                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
386                                else:
387                                        temp = processingConfig['xqDocTypes']                                   
388                                        temp.update({xqClass[0]:xqClass[1]})
389                                        processingConfig['xqDocTypes'] = temp
390                                       
391                processing_config_file.close()
392                       
393                return processingConfig
394       
395       
396        def getConfigDetails(self, datacentre):
397                '''
398                Get the harvested records directory and groups for this datacentre from the
399                datacentre specific config file.  The harvested records directory depends on the
400                datacentres OAI base url, the set and format. These have to be know up-front.
401                The groups denote which 'portal groups' they belong to - for limiting searches to
402                say NERC-only datacentres records.
403                Groups are added to the intermediate MOLES when it is created.
404                @param datacentre: datacentre to use when looking up config file
405                '''
406                # initialise the variables to retrieve from the config file
407                self._harvest_home = ""
408                self._datacentre_groups = ""
409                self._datacentre_format = ""
410                self._datacentre_namespace = ""
411                self._NDG_dataProvider = False
412                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
413               
414               
415                #note changed to production buildout path
416                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
417               
418                #fed up with this rubbish causing problems - use a normal file opener.
419                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
420               
421               
422                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
423                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
424               
425                # Check this file exists; if not, assume an invalid datacentre has been specified
426                if not os.path.isfile(self._datacentre_config_filename):
427                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
428                        "specified (%s) is invalid\n" %datacentre)
429                   
430               
431                datacentre_config_file = open(self._datacentre_config_filename, "r")
432               
433                for line in datacentre_config_file.readlines():
434                        words  = string.split(line)
435                        if len(words) == 0:
436                                continue
437                        elif words[0] == 'host_path':
438                                self._harvest_home = words[1].rstrip()
439                        elif words[0] == 'groups':
440                                self._datacentre_groups = words[1:]
441                        elif words[0] == 'format':
442                                self._datacentre_format = words[1]
443                        elif words[0] == 'namespace':
444                                self._datacentre_namespace = words[1]
445                        elif words[0] == 'NDG_dataProvider':
446                                self._NDG_dataProvider = True
447                        elif words[0] == 'feed':
448                                self._IngestViaFeed = True
449                        elif words[0] == 'dpws_id':
450                                self._DPWS_ID = words[1]
451                        elif words[0] == 'csw_publish':
452                                self._PublishToCSW = words[1]
453                        elif words[0] == 'csw_publish_dir':
454                                self._AggregateDir = words[1]
455                       
456                datacentre_config_file.close()
457               
458                #stop ingest on this datacentre if process thread is OAI and the config says FEED
459                if ((self.processThread == 'OAI') and self._IngestViaFeed):
460                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
461                                               
462                if self._harvest_home == "":
463                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
464               
465                logging.info("harvested records are in " + self._harvest_home)
466               
467                if self._datacentre_groups == "":
468                    logging.info("No groups/keywords set for datacentre " + datacentre)
469                else:
470                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
471               
472                if self._datacentre_format == "":
473                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
474               
475                logging.info("format being harvested: " + self._datacentre_format)
476               
477                if self._datacentre_namespace == "":
478                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
479               
480                logging.info("datacentre namespace: " + self._datacentre_namespace)
481               
482                if self._DPWS_ID == "":
483                        sys.exit("No DPWS ID entered for this provider!")
484               
485                if self._NDG_dataProvider:
486                        logging.info("Datacentre classified as an NDG data provider")
487                else:
488                        logging.info("Datacentre is not classified as an NDG data provider")
489                logging.info(self.lineSeparator)
490               
491               
492        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
493                '''
494                Processes/renames the files (changed 08/01/07 to get id from inside file)
495                 - also replace any namespace declarations with a standard one which we know works in NDG
496                 NB, this copies files from the original dir to the discovery dir
497                 @param originals_dir: directory to convert files from
498                 @param discovery_dir: directory in which converted files will end up
499                 @return numfilesproc: counter of number of files processed
500                '''
501                numfilesproc = 0
502                badXMLfiles = 0         
503               
504                self.inputFileOrigFinal = {}
505                self.badXMLfileList = {}               
506                self.difXML = None
507               
508                #create dictionary for a mapping between original files and converted discovery files
509                self.originalsDiscoveryFilesMap = {}
510               
511                logging.info(self.lineSeparator)
512                logging.info("Renaming files:")
513               
514                if self.ingestProcessID is not None and self.procID is None:
515                       
516                        self._getPostgresDBConnectionLogging()
517                                               
518                        #if process id has been supplied, then update the logging db with a "start_ingest"
519                        sqlStatusCmd = "select update_ingest_status (%s, 'run_ingest');" %self.ingestProcessID
520                        self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
521                       
522                        sqlStatusCmd = "select update_ingest_info (%s, ' (preprocessing...)');" %(self.ingestProcessID)
523                        self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
524               
525                for filename in os.listdir(originals_dir):
526                       
527                        if not filename.endswith('.xml'):
528                                logging.warning('File %s is not xml format. Not processed'  %(filename))
529                                continue
530                       
531                        original_filename = originals_dir + filename
532                       
533                        #get the name of the file to be used in the xquery
534                        metadataFilename = filename.replace('.xml', '')
535                               
536                       
537                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
538                        if format == 'DIF_9.4':
539                                                       
540                                logging.info("Converting DIF to stub ISO...")                                   
541                               
542                                #if DIF still need to convert URL's using the old method so when requesting DIF in original format we still have correct redirect urls....
543                                #redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self._ndgRedirectURL)
544                                                                                                               
545                                #gets the metadata id from the xml
546                                metadataID=self.getID(original_filename)
547                                #import pdb
548                                #pdb.set_trace()
549                                                               
550                                repositoryName = self._datacentre_namespace
551                               
552                                                               
553                                #where is the file to be ingested located?
554                                metadataFileLoc = originals_dir
555                               
556                                #generate a new stubISO filename
557                                self.isoFormat= "stubISO"
558                                                               
559                               
560                                #self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename)
561                                self.xqueryTransformation = xqueryTransformation(metadataFileLoc,repositoryName,metadataID,metadataFilename,self._saxonJarFile)
562                                                                                                                       
563                                #get hold of the xml for the transformed ISO                           
564                                self.isoXML = self.xqueryTransformation.runXquery(self._dif2isoXquery)
565                                                               
566                                #create the iso file on the system
567                                tempStubISOfile = original_filename + ".stubISO.xml"
568                                FileUtilities.createFile(tempStubISOfile , self.isoXML)
569                               
570                                #generate the ISO object from the converted ISO here now..
571                                try:
572                                       
573                                        #temp assignment so failure mode supported
574                                        new_filename_short = original_filename
575                                                                       
576                                        isoDataModel = ExtractISO(tempStubISOfile,self._isoClass[self._datacentre_format])
577                               
578                                        if isoDataModel.createISOdataStructure() is True:
579                                                logging.info("ISO extractor worked fine!")
580                                               
581                                                #need to get hold original format XML too (so can put directly into "transformed" docs section...
582                                                self.originalXMLdoc = ''
583                               
584                                                try:
585                                                        logging.info("Extracting original input file into variable for input into original format in transformed docs table")
586                                                        self.originalXMLdoc = file(original_filename).read()
587                                                        self.difXML = self.originalXMLdoc
588                                       
589                                                except:
590                                                        logging.warn("Could not extract original file to local variable!")
591                                                       
592                                               
593                                                #Back to main stream of ISO ingest
594                                                if self._NDG_dataProvider:
595                                       
596                                                        #to ensure "stubISO" doesnt make it into the filename on system and in the db, use the self.
597                                                               
598                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
599                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + format + "__"+ metadataFilename.replace(":", "-")+".xml"
600                                                        new_filename_short = self._datacentre_namespace + "__" + format + "__" + metadataFilename.replace(":", "-")+".xml"
601                               
602                                       
603                                                else:
604                                                        mappingIndex = isoDataModel.findTheListData(isoDataModel.datasetID)
605                                                        ident = isoDataModel.datasetID[mappingIndex][0]
606                                       
607                                                        ident = ident.replace(":", "-")
608                                                        ident = ident.replace("/", "-")
609                               
610                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
611                                                        new_filename_short = self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
612                               
613                                       
614                               
615                                                #now create this stub ISO file on system so can access it
616                                                self.genIsoFile = discovery_dir + new_filename_short   
617                                                               
618                                                #FileUtilities.createFile(self.stubIsoFile, self.isoXML)
619                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
620                                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
621                                               
622                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
623                                                #this links a derived filename in processing dir with original filename
624                                                #get basic filename from the path+filename passed to this function to use as key               
625                                                self.inputFileOrigFinal[new_filename_short]=filename
626                                               
627                                                numfilesproc += 1
628                       
629                       
630                                        elif isoDataModel.createISOdataStructure() is True:
631                                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
632                                                #sys.exit() # TODO fix these
633                                               
634                                        else:
635                                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
636                                                badXMLfiles += 1
637                                                self.badXMLfileList[new_filename_short]=filename
638                                                                       
639                                except:
640                                        logging.error("Could not generate ISO XML from DIF format!!")
641                                        badXMLfiles += 1
642                                        self.badXMLfileList[new_filename_short]=filename
643                                       
644                                self.originalsDiscoveryFilesMap[new_filename_short] = filename
645                               
646                       
647                        #all other ISO formats/profiles to deal with.
648                        else:                           
649                                isoFormat = format
650                               
651                                self.originalXMLdoc = "null"
652                               
653                               
654                                #generate the ISO object from the ISO xml..                                                                             
655                                isoDataModel = ExtractISO(original_filename,self._isoClass[self._datacentre_format])
656                               
657                                if isoDataModel.createISOdataStructure() is True:
658                                                logging.info("ISO extractor worked fine!")
659                                               
660                                                #Back to main stream of ISO ingest
661                                                if self._NDG_dataProvider:
662                                                               
663                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
664                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
665                                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
666                               
667                                                else:
668                                                        mappingIndex = isoDataModel.findTheListData(isoDataModel.datasetID)     
669                                                        ident = isoDataModel.datasetID[mappingIndex][0]
670                                                       
671                                                        ident = ident.replace(":", "-")
672                                                        ident = ident.replace("/", "-")
673                               
674                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
675                                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
676                               
677                                                #now create this stub ISO file on system so can access it
678                                                self.genIsoFile = discovery_dir + new_filename_short   
679                               
680                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
681                                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
682                               
683                                                #get the converted ISO into a local var
684                                                try:
685                                                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
686                                                        self.isoXML = file(self.genIsoFile).read()
687                                                        self.originalXMLdoc = self.isoXML
688                                       
689                                                except:
690                                                        logging.warn("Could not extract converted ISO xml to local variable!")
691                                                        self.isoXML = ''
692                                                       
693                                                logging.info("original file = " + original_filename)
694                                                logging.info("newfile = " + new_filename)
695                               
696                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
697                                                #this links a derived filename in processing dir with original filename
698                                                #get basic filename from the path+filename passed to this function to use as key               
699                                                self.inputFileOrigFinal[new_filename_short]=filename
700                                               
701                                                numfilesproc += 1
702                                                                                               
703                                elif isoDataModel.createISOdataStructure() is True:
704                                        logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
705                                                #sys.exit() # TODO fix these
706                                       
707                                else:                                   
708                                        logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")
709                                        badXMLfiles += 1
710                                        self.badXMLfileList[new_filename_short]=filename
711                               
712                       
713                                self.originalsDiscoveryFilesMap[new_filename_short] = filename
714                       
715                logging.info("File renaming and converting completed")
716                logging.info(self.lineSeparator)
717               
718                       
719                return numfilesproc,badXMLfiles
720       
721       
722        def _getPostgresDBConnection(self):
723                '''
724                Get the default postgres DB connection - by reading in data from the db config file
725                '''
726               
727                logging.debug("Setting up connection to postgres DB")
728                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
729               
730               
731                self.pgc = pgc(configFile = self._databaseConfigurationFile)
732                logging.info("Postgres DB connection now set up")
733               
734               
735       
736        def _getPostgresDBConnectionLogging(self):
737                '''
738                Get the default postgres DB connection - by reading in data from the db config file
739                '''
740               
741                logging.debug("Setting up connection to postgres DB")
742                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
743               
744               
745                self.pgc_IngestLog = pgc(configFile = self.ingestLoggingDatabaseConfigurationFile)
746                logging.info("Postgres DB connection now set up")
747
748
749
750        def     _backupAndCleanData(self):
751                '''
752                Back up ingested data for specified data centre, then clean any of
753                the ingest dirs
754                '''
755                logging.info("Backing up ingested data")
756                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
757                  strftime("%y%m%d_%H%M") + "_originals/"
758               
759                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
760                logging.info("Data backed up - now clearing ingest directories")
761                #Clear out the original harvest records area and discovery dir
762                #FileUtilities.cleanDir(self.originals_dir)
763                #FileUtilities.cleanDir(self.discovery_dir)
764                logging.info("Ingest directories cleared")
765
766
767        def     _setupDataCentreDirs(self):
768                '''
769                Set up directories appropriate for the current data centre
770                '''
771                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
772               
773               
774                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
775               
776                # the following dirs define where the specific documents should go
777                self.originals_dir = data_dir + "/oai/originals/" # copy of original
778                self.discovery_dir = data_dir + "/discovery/"
779                #self.stubISO_dir = data_dir + "/stub_iso/"
780               
781                # Create/clear the 'in' and 'out' directories
782                FileUtilities.setUpDir(self.originals_dir)
783                FileUtilities.setUpDir(self.discovery_dir)
784               
785                logging.info("Ingest directories for data centre set up")
786
787
788        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
789                '''
790                Convert files from originals dir to discovery one, then
791                ingest, backup and clean
792                @param originals_dir: directory to ingest docs from
793                @param discovery_dir: directory to use to process ingested docs
794                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
795                '''
796               
797                self.thisIngestStartDate = str(datetime.datetime.now())
798               
799                numfilesproc,badXmlFiles = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
800               
801                filenames = os.listdir(self.discovery_dir)
802               
803                ingestProbMsg = ""
804                ingestWarnMsg = ""
805                               
806                #generate a list of files ALREADY in database so can compare with what has been ingested
807                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
808                               
809                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
810                filesPresentList=[]
811       
812               
813                if filePresentListArr:
814                        for fileArr in filePresentListArr:
815                                filesPresentList.append(fileArr[0])
816                               
817                                #TODO - is above relevant - duplicating functionlaity?
818       
819                #create list to to hold files ingest failed on.
820                self.updateFailList = []
821                self.deletedFailList = []
822                self.problemMessageList = {}   
823                self.warnMessageList = {}
824               
825                counter = 1
826                warningCounter = 0
827                               
828                for filename in filenames:
829                       
830                        fullPath = self.discovery_dir + filename
831                       
832                        #if process id has been supplied, then update the logging db with a "start_ingest"
833                        if self.ingestProcessID is not None and self.procID is None:
834                                sqlStatusCmd = "select update_ingest_info (%s, ' (ingesting %s of %s)');" %(self.ingestProcessID,counter,numfilesproc)                 
835                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
836                       
837                       
838                        if os.path.isfile(fullPath):
839                                       
840                                thisIngestedID = self.addFileToPostgresDB(fullPath)
841                               
842                                #record all problem messages
843                                #ingestProbMsg = ingestProbMsg + self.ingestProblemMessage + "\n"
844                                if self.ingestProblemMessage != '':
845                                        ingestProbMsg = ingestProbMsg + self.ingestProblemMessage                                       
846                                        self.problemMessageList[self.originalsDiscoveryFilesMap[filename]] = self.ingestProblemMessage
847                                       
848                                else:
849                                       
850                                        #at this point, check to see if record can be aggregated for publishing to CSW?
851                                        if self._PublishToCSW == "True":
852                                               
853                                                #copy file to specified aggregation directory
854                                                FileUtilities.copyFileToDir(fullPath, self._AggregateDir)
855                                       
856                                if self.ingestWarningMessage != '':
857                                        warningCounter += 1
858                                        ingestWarnMsg = ingestWarnMsg + self.ingestWarningMessage
859                                        self.warnMessageList[filename] = self.ingestWarningMessage
860                                       
861                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
862                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
863                                        if thisIngestedID in filesPresentList: 
864                                                filesPresentList.remove(thisIngestedID)
865                       
866                       
867                        counter += 1   
868               
869                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
870                #will need to be removed.
871               
872                #only do this if not in single file mode (or else wverything else gets deleted!)               
873                if ((self.indFileToIngest == "") & (feed)):
874                        for item in filesPresentList:
875                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
876                                DeleteRecord(item)
877                                self.deletedFailList.append(item)
878                                self._no_files_deleted += 1
879                       
880                       
881                self._backupAndCleanData()
882               
883                #at this stage put the reporting code in (will work for oai or atom feed)
884                #create a summary file for each data centre ingest
885                data_dir = self._base_dir + "data/"
886                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
887                recOpFile = open(recOpFileName,'w')
888               
889                logging.info("oai_document_ingest processing complete:")
890               
891                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
892                self.thisIngestEndDate = str(datetime.datetime.now())
893               
894                messageSeparator = "\n\n********************************************************************************"
895               
896                message = "Ingest report for data centre: " + datacentre + "\n"
897                message = message + "Ingest date: " + self.thisIngestEndDate + "\n"
898                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
899                message = message + "TOTAL_NUMBER_OF_FILES " + str(numfilesproc+badXmlFiles) + "\n"
900                message = message + "TOTAL_PROCESSED_SUCCESFULLY " + str(numfilesproc) + "\n"
901                message = message + "TOTAL_PROCESSED_UNSUCCESFULLY " + str(badXmlFiles) + "\n" 
902                message = message + "INGESTED_Created " + str(self._no_files_ingested)  + "\n"
903                message = message + "INGESTED_Updated " + str(self._no_files_changed)  + "\n"
904                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
905               
906                if warningCounter > 0:
907                        message = message + messageSeparator
908                        message = message + "\nINGEST WARNINGS:  " + str(warningCounter) + "\n"
909                       
910                        message = message +  "\nBelow are the files that had ingest warnings...\n\n"
911                       
912                        for warnFile in self.warnMessageList.keys():
913                                message = message + "\nWARNING_FILE " + self.originalsDiscoveryFilesMap[warnFile] + "    " + self.warnMessageList[warnFile] + "\n"
914                       
915               
916                if self._no_problem_files > 0: 
917                        message = message + messageSeparator
918                        message = message + "\nPROBLEM_FILES: " + str(self._no_problem_files)  + "\n"
919                       
920               
921                if len(self.updateFailList) > 0:
922                        message = message +  "\nBelow are comments recorded on problem files...\n\n"
923                                       
924                        for badFile in self.updateFailList:
925                                #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
926                               
927                                #NOTE: there may not be a problem message for every file that failed to ingest..
928                                if badFile in self.problemMessageList.keys():
929                                        message = message +"\nPROBLEM_FILE " + badFile + "\n"
930                                        message = message + self.problemMessageList[badFile] + "\n"
931                               
932               
933                if len(self.badXMLfileList.keys()) > 0:
934                        message = message + messageSeparator
935                        message = message +  "\nBelow are the files that could not be parsed (i.e. bad XML?)...\n\n"
936                        for badXmlFile in self.badXMLfileList.values():
937                                message = message + "BAD_XML_FILE: " + badXmlFile +"\n"
938               
939                                               
940                #log processing results to ingestLogging table if required to           
941               
942                if self.ingestProcessID is not None:
943                        logging.info("Updating ingest logging database")
944                                               
945                        sqlLoggingCmd = "select add_ingest_stats(%s,'%s','%s','%s','%s','%s','%s','%s','%s','%s');"%(self.ingestProcessID,self.thisIngestStartDate,self.thisIngestEndDate,numfilesproc+badXmlFiles,numfilesproc,badXmlFiles,self._no_files_ingested,self._no_files_changed,self._no_files_deleted,self._no_problem_files)
946                                       
947                        #update ingestLogging                   
948                        self.pgc_IngestLog.runSQLCommand(sqlLoggingCmd)
949                       
950                        #now update problem file log if there are problem files
951                        if self._no_problem_files > 0:
952                                logging.info("There are %s problem files - recording these in stats logging db" %self._no_problem_files)
953                               
954                                #note sometimes message will not always be recorded, so just record name of file
955                                if len(self.problemMessageList.keys()) < 1:
956                                       
957                                        for badFile in self.updateFailList:
958                                                sqlProbFileCmd = "select add_problem_file(%s,'%s','%s');"%(self.ingestProcessID,badFile,'Could not catch precise error for this file!!')
959                               
960                                                #update ingestLogging           
961                                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)
962                               
963                                else:
964                                        for badFile in self.problemMessageList.keys():                                 
965                                                sqlProbFileCmd = "select add_problem_file(%s,'%s','%s');"%(self.ingestProcessID,badFile,self.problemMessageList[badFile])
966                                                       
967                                                #update ingestLogging                   
968                                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)               
969                               
970                                       
971                        #if process id has been supplied, then update the logging db with a "start_ingest" (only for DPWS ops)
972                        if self.procID is None:
973                                sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
974                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
975                       
976                #update with content from the problem messages recorded
977                #message = message + ingestProbMsg + "\n"               
978                         
979                recOpFile.write(message)
980                               
981                return numfilesproc,badXmlFiles, message
982
983
984
985        def _setupXQueries(self):
986                '''
987                Set up the required XQueries
988                - NB, extract the xquery libraries locally for easy reference
989                '''
990                self._xq = ndgResources()
991                for libFile in self._xq.xqlib:
992                        # NB, we don't want the full path to the files - just the filename
993                        fileName = libFile.split('/')[-1]
994                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.