source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 7725

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@7725
Revision 7725, 37.0 KB checked in by sdonegan, 10 years ago (diff)

Updates to allow proper handling of NERC DMS format and uprated dif2iso conversion

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24
25class AbstractDocumentIngester(object):
26        '''
27        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
28        - including running the various transforms and parsings to get all doc types and spatiotemporal
29        data in the correct form in the DB
30        '''
31        lineSeparator = "------------------------------"
32               
33        # The directory to put things for a tape backup (should already exist)
34        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
35       
36        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
37               
38        def _setupCmdLineOptions(self):
39                '''
40                Determine the logging level to use and configure this appropriately
41                @return args: any input arguments - excluding options
42                '''
43               
44                # check for verbose option
45                try:
46                        opts, args = getopt.getopt(sys.argv[1:], "vd")
47                except getopt.GetoptError, err:
48                    # print help information and exit:
49                    print str(err) # will print something like "option -a not recognized"
50                    sys.exit(2)
51                   
52                if len(args) < 1:
53                        self.usage()
54                   
55                loggingLevel = logging.ERROR
56                for o, a in opts:
57                    if o == "-v":
58                        print " - Verbose mode ON"
59                        loggingLevel = logging.INFO
60                    elif o == "-d":
61                        print " - Debug mode ON"
62                        loggingLevel = logging.DEBUG
63                   
64               
65                # set up any keywords on the object
66                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
67                checkConfInArgs = False         
68                checkProcID = False
69                for arg in args:
70                       
71                        keywordArgs=['ingestFromDate','individualFile','interval','ingestFromDate',\
72                                                'eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile', \
73                                                'ingestProcessID']
74                                               
75                       
76                        bits = arg.split('=')
77                        if len(bits) == 2:
78                                if bits[0] == keywordArgs[3]:
79                                        self.setIngestFromDate(bits[1])
80                                elif bits[0] == keywordArgs[2]:
81                                        self.setPollInterval(bits[1])
82                                elif bits[0] == keywordArgs[7]:
83                                        print " - Running with specified config file!"
84                                        self.setOaiConfigFile(bits[1])
85                                        checkConfInArgs = True
86                                elif bits[0] == keywordArgs[1]:
87                                        print " - Running in single file ingestion mode!"
88                                        self.setIndFileToIngest(bits[1])
89                                elif bits[0] == keywordArgs[8]:
90                                        print " - WARNING: will log results of this ingestion process into the specified logging database! (NOTE: will also take location of metadata from database as this option intended for DPWS!!)"
91                                        self.setIngestLogID(bits[1])
92                                        checkProcID = True                             
93                                elif bits[0] not in keywordArgs:
94                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
95                                        sys.exit(2)
96                                else:
97                                        setattr(self, bits[0], bits[1])
98                       
99                if checkConfInArgs is False:
100                        print "\nWARNING: Ingest config file has not been specified. Using local default"
101                        self.setOaiConfigFile('./oai_document_ingester.config')
102                       
103               
104               
105                print self.lineSeparator
106               
107                # NB, this is a slight fudge as cannot get the detailed logging to work
108                # without setting up a new logger here - which means we get two loggers
109                # outputing data. The initial call to logging needs to be tracked down
110                # and configured correctly, so this can be avoided...
111#               self.logger = logging.getLogger()
112#               self.logger.setLevel(loggingLevel)
113
114                # create console handler and set level to debug
115#               ch = logging.StreamHandler()
116#               ch.setLevel(loggingLevel)
117                # create formatter
118#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
119                # add formatter to ch
120#               ch.setFormatter(formatter)
121                # add ch to logger
122#               self.logger.addHandler(ch)
123               
124                return args
125
126
127        def getID(self, filename):
128                '''
129                Gets the identifier out of an input metadata xml record.
130                @param filename - name of document file being processed
131                @return: ID - id to use to refer to the document
132                '''
133                logging.info("Retrieving identifier for metadata record " + filename)
134       
135                xml=file(filename).read()
136               
137                ID = idget(xml)
138               
139                return ID
140       
141       
142        def addFileToPostgresDB(self, filename):
143                '''
144                Add a file to the postgres DB - extracting and storing all the required
145                data in the process
146                @param filename: full path of file to add to postgres DB
147                '''
148                logging.info("Adding file, " + filename + ", to postgres DB")
149               
150                numSlash = len(filename.split('/'))
151                               
152                shortFilename = filename.split('/')[numSlash - 1]
153               
154                self.recordAttemptToIngestDiscoveryID = ""
155                self.ingestWarningMessage = ""
156               
157                self.ingestProblemMessage = ""
158               
159                #need to generate a local version of the iso data model (first call was to run through all available files
160                #and convert & place into relevant dirs as necessary
161                #generate the ISO object from the converted ISO here now..
162               
163                try:
164                                                                                                                                       
165                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
166                       
167                                                       
168                        if self.isoDataModel.createISOdataStructure() is True:
169                                logging.info("ISO extractor worked fine!")
170                       
171                       
172                        elif self.isoDataModel.createISOdataStructure() is True:
173                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
174                                                #sys.exit() # TODO fix these
175                                return
176                        else:
177                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
178                                                #sys.exit()
179                                return
180                except:
181                        logging.error("Could not generate ISO XML!")
182                        return
183               
184                #get the converted ISO into a local var
185                try:
186                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
187                        self.isoXML = file(filename).read()
188                        self.originalXMLdoc = self.isoXML
189                                       
190                except:
191                        logging.warn("Could not extract converted ISO xml to local variable!")
192                        self.isoXML = ''
193                               
194               
195                # first of all create a PostgresRecord - this object represents all the data required
196                # for a DB entry
197                dao = None
198               
199                try:
200                       
201                        #if record is valid (vital elements checked in ExtractISO - validDoc is True if everything ok..
202                       
203                        #record whats bout to be ingested - if a problem this, error should already have been kicked out
204                        mappingIndex = self.isoDataModel.findTheListData(self.isoDataModel.datasetID)                                   
205                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[mappingIndex][0]
206                       
207                       
208                        if self.isoDataModel.validDoc is True:                 
209                               
210                               
211                                #record whats attempting to be ingested
212                                record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
213                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
214                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, self._currentMedinStandard, stubIso = self.isoXML, difXML = self.difXML, isoXML = self.isoXML)                   
215                               
216                                # Now create the data access object to interface to the DB                     
217                                dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc, dpwsID = self._DPWS_ID )
218                       
219                                # Finally, write the new record
220                                # 0 if failed, 1 if update, 2 if create                         
221                                returnCode = dao.createOrUpdateRecord()
222                                                               
223                                if dao.processingWarning != '':
224                                        self.ingestWarningMessage = dao.processingWarning
225                                                                               
226                                if returnCode == 2:
227                                        self._no_files_ingested += 1
228                               
229                                elif returnCode == 1:
230                                        self._no_files_changed += 1
231                                       
232                        else:
233                                self.ingestProblemMessage= "Content validity problems for record: %s (%s)" %(self.recordAttemptToIngestDiscoveryID,self.isoDataModel.processingMsg)
234                               
235                                logging.warn(self.ingestProblemMessage)
236                               
237                                #now raise error so can record this
238                                raise ValueError,self.ingestProblemMessage
239                       
240                except:
241                       
242                       
243                        #if error encountered, add to failure lisr
244                        logging.error("Could not update: " + filename)                 
245                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
246                        self.updateFailList.append(originalRecordFilename)
247                       
248                        logging.error("Exception thrown - detail: ")
249                        errors = sys.exc_info()
250                        logging.error(errors)
251                        self._error_messages += "%s\n" %str(errors[1])
252                       
253                        if dao:
254                                logging.info("Removing record and its associated info from DB")
255                                logging.info("- to allow clean ingestion on rerun")
256                                try:
257                                        dao.deleteOriginalRecord()
258                                except:
259                                        logging.error("Problem encountered when removing record: ")
260                                        logging.error(sys.exc_info())
261                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
262
263                        self._no_problem_files += 1
264                       
265                        logging.info("Continue processing other files")
266                       
267                return self.recordAttemptToIngestDiscoveryID
268                       
269       
270        def getProcessingConfig(self,configFilePath):
271               
272                '''
273                Fed up with messing about with hardcoded directory paths. 
274                This method to get relevant values out of the oai_document_ingester.config file
275               
276                Returns a dictionary of values with keys:
277               
278                #directory in which the code resides
279                code_directory /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/
280
281                #base directory in which metadata is extracted to and converted to
282                base_directory /home/badc/discovery_docs/ingestDocs/
283
284                #directory in which to write reports to
285                reporting_directory /home/badc/discovery_docs/ingestDocs/data/
286
287                #path to the passwords file
288                passwords_file /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/passwords.txt
289
290                #datacentre config file directory path
291                datcentre_configs /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/
292                '''
293               
294                # Check this file exists; if not, assume an invalid datacentre has been specified
295               
296                if not os.path.isfile(configFilePath):
297                    sys.exit("ERROR: Could not find the processing config file")
298                   
299                processingConfig = {}
300                   
301                processing_config_file = open(configFilePath, "r")
302               
303                for line in processing_config_file.readlines():
304                        words  = string.split(line)
305                        if len(words) == 0:
306                                continue
307                        elif words[0] == 'code_directory':
308                                processingConfig['code_directory'] = words[1]
309                                                                                               
310                        elif words[0] == 'base_directory':
311                                processingConfig['base_directory'] = words[1]
312                               
313                        elif words[0] == 'reporting_directory':
314                                processingConfig['reporting_directory'] = words[1]
315                               
316                        elif words[0] == 'passwords_file':
317                                processingConfig['passwords_file'] = words[1]
318                               
319                        elif words[0] == 'datcentre_configs':
320                                processingConfig['datcentre_configs'] = words[1]
321                                       
322                        elif words[0] == 'changeURLs':
323                                processingConfig['changeURLs'] = words[1]
324                               
325                        elif words[0] == 'NDG_redirect_URL':
326                                processingConfig['NDG_redirect_URL'] = words[1]
327                               
328                        elif words[0] == 'ingestLoggingConfig':
329                                processingConfig['ingestLoggingConfig'] = words[1]
330                               
331                        elif words[0] == 'ingestConfig':
332                                if not os.path.isfile(words[1]):
333                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
334                                else:
335                                        processingConfig['ingestConfig'] = words[1]
336                               
337                        elif words[0] == 'acceptedFormats':
338                                processingConfig['acceptedFormats'] = words[1]
339                               
340                        elif words[0] == 'saxonJarFile':
341                                if not os.path.isfile(words[1]):
342                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
343                                else:
344                                        processingConfig['saxonJarFile'] = words[1]
345                        elif words[0] == 'xpathIsoClass':
346                               
347                                #build a dictionary with format as the key                             
348                                isoClass = words[1].split(":")                         
349                               
350                                if 'xpathIsoClass' not in processingConfig.keys():
351                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
352                                else:
353                                        temp = processingConfig['xpathIsoClass']                                       
354                                        temp.update({isoClass[0]:isoClass[1]})
355                                        processingConfig['xpathIsoClass'] = temp
356                               
357                        elif words[0] == 'xqueryStubIsoGen':
358                               
359                                #build a dictionary with format as the key
360                                isoClass = words[1].split(":")
361                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
362                               
363                        elif words[0] == 'xqueryConversions':
364                               
365                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
366                                processingConfig['xqueryConversions'] = words[1].split(",")
367                               
368                        elif words[0] == 'currentMEDIN':
369                               
370                                #define the current MEDIN standard so can cast conversions as this (at some stage we will ingest a few different versions)
371                                processingConfig['currentMEDIN'] = words[1]
372                               
373                        elif words[0] == 'exceptXqConv':
374                               
375                                #build a dictionary with format as the key
376                                xqClass = words[1].split(":")
377                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
378                               
379                        elif words[0] == 'xqDocTypes':
380                               
381                                #build a dictionary with format as the key
382                                xqClass = words[1].split(":")
383                               
384                                if 'xqDocTypes' not in processingConfig.keys():
385                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
386                                else:
387                                        temp = processingConfig['xqDocTypes']                                   
388                                        temp.update({xqClass[0]:xqClass[1]})
389                                        processingConfig['xqDocTypes'] = temp
390                                       
391                processing_config_file.close()
392                       
393                return processingConfig
394       
395       
396        def getConfigDetails(self, datacentre):
397                '''
398                Get the harvested records directory and groups for this datacentre from the
399                datacentre specific config file.  The harvested records directory depends on the
400                datacentres OAI base url, the set and format. These have to be know up-front.
401                The groups denote which 'portal groups' they belong to - for limiting searches to
402                say NERC-only datacentres records.
403                Groups are added to the intermediate MOLES when it is created.
404                @param datacentre: datacentre to use when looking up config file
405                '''
406                # initialise the variables to retrieve from the config file
407                self._harvest_home = ""
408                self._datacentre_groups = ""
409                self._datacentre_format = ""
410                self._datacentre_namespace = ""
411                self._NDG_dataProvider = False
412                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
413               
414               
415                #note changed to production buildout path
416                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
417               
418                #fed up with this rubbish causing problems - use a normal file opener.
419                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
420               
421               
422                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
423                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
424               
425                # Check this file exists; if not, assume an invalid datacentre has been specified
426                if not os.path.isfile(self._datacentre_config_filename):
427                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
428                        "specified (%s) is invalid\n" %datacentre)
429                   
430               
431                datacentre_config_file = open(self._datacentre_config_filename, "r")
432               
433                for line in datacentre_config_file.readlines():
434                        words  = string.split(line)
435                        if len(words) == 0:
436                                continue
437                        elif words[0] == 'host_path':
438                                self._harvest_home = words[1].rstrip()
439                        elif words[0] == 'groups':
440                                self._datacentre_groups = words[1:]
441                        elif words[0] == 'format':
442                                self._datacentre_format = words[1]
443                        elif words[0] == 'namespace':
444                                self._datacentre_namespace = words[1]
445                        elif words[0] == 'NDG_dataProvider':
446                                self._NDG_dataProvider = True
447                        elif words[0] == 'feed':
448                                self._IngestViaFeed = True
449                        elif words[0] == 'dpws_id':
450                                self._DPWS_ID = words[1]
451                       
452                datacentre_config_file.close()
453               
454                #stop ingest on this datacentre if process thread is OAI and the config says FEED
455                if ((self.processThread == 'OAI') and self._IngestViaFeed):
456                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
457                                               
458                if self._harvest_home == "":
459                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
460               
461                logging.info("harvested records are in " + self._harvest_home)
462               
463                if self._datacentre_groups == "":
464                    logging.info("No groups/keywords set for datacentre " + datacentre)
465                else:
466                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
467               
468                if self._datacentre_format == "":
469                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
470               
471                logging.info("format being harvested: " + self._datacentre_format)
472               
473                if self._datacentre_namespace == "":
474                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
475               
476                logging.info("datacentre namespace: " + self._datacentre_namespace)
477               
478                if self._DPWS_ID == "":
479                        sys.exit("No DPWS ID entered for this provider!")
480               
481                if self._NDG_dataProvider:
482                        logging.info("Datacentre classified as an NDG data provider")
483                else:
484                        logging.info("Datacentre is not classified as an NDG data provider")
485                logging.info(self.lineSeparator)
486               
487               
488        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
489                '''
490                Processes/renames the files (changed 08/01/07 to get id from inside file)
491                 - also replace any namespace declarations with a standard one which we know works in NDG
492                 NB, this copies files from the original dir to the discovery dir
493                 @param originals_dir: directory to convert files from
494                 @param discovery_dir: directory in which converted files will end up
495                 @return numfilesproc: counter of number of files processed
496                '''
497                numfilesproc = 0
498                badXMLfiles = 0         
499               
500                self.inputFileOrigFinal = {}
501                self.badXMLfileList = {}               
502                self.difXML = None
503               
504                #create dictionary for a mapping between original files and converted discovery files
505                self.originalsDiscoveryFilesMap = {}
506               
507                logging.info(self.lineSeparator)
508                logging.info("Renaming files:")
509               
510                if self.ingestProcessID is not None and self.procID is None:
511                       
512                        self._getPostgresDBConnectionLogging()
513                                               
514                        #if process id has been supplied, then update the logging db with a "start_ingest"
515                        sqlStatusCmd = "select update_ingest_status (%s, 'run_ingest');" %self.ingestProcessID
516                        self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
517                       
518                        sqlStatusCmd = "select update_ingest_info (%s, ' (preprocessing...)');" %(self.ingestProcessID)
519                        self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
520               
521                for filename in os.listdir(originals_dir):
522                       
523                        if not filename.endswith('.xml'):
524                                logging.warning('File %s is not xml format. Not processed'  %(filename))
525                                continue
526                       
527                        original_filename = originals_dir + filename
528                       
529                        #get the name of the file to be used in the xquery
530                        metadataFilename = filename.replace('.xml', '')
531                               
532                       
533                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
534                        if format == 'DIF_9.4':
535                                                       
536                                logging.info("Converting DIF to stub ISO...")                                   
537                               
538                                #if DIF still need to convert URL's using the old method so when requesting DIF in original format we still have correct redirect urls....
539                                #redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self._ndgRedirectURL)
540                                                                                                               
541                                #gets the metadata id from the xml
542                                metadataID=self.getID(original_filename)
543                                #import pdb
544                                #pdb.set_trace()
545                                                               
546                                repositoryName = self._datacentre_namespace
547                               
548                                                               
549                                #where is the file to be ingested located?
550                                metadataFileLoc = originals_dir
551                               
552                                #generate a new stubISO filename
553                                self.isoFormat= "stubISO"
554                                                               
555                               
556                                #self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename)
557                                self.xqueryTransformation = xqueryTransformation(metadataFileLoc,repositoryName,metadataID,metadataFilename,self._saxonJarFile)
558                                                                                                                       
559                                #get hold of the xml for the transformed ISO                           
560                                self.isoXML = self.xqueryTransformation.runXquery(self._dif2isoXquery)
561                                                               
562                                #create the iso file on the system
563                                tempStubISOfile = original_filename + ".stubISO.xml"
564                                FileUtilities.createFile(tempStubISOfile , self.isoXML)
565                               
566                                #generate the ISO object from the converted ISO here now..
567                                try:
568                                       
569                                        #temp assignment so failure mode supported
570                                        new_filename_short = original_filename
571                                                                       
572                                        isoDataModel = ExtractISO(tempStubISOfile,self._isoClass[self._datacentre_format])
573                               
574                                        if isoDataModel.createISOdataStructure() is True:
575                                                logging.info("ISO extractor worked fine!")
576                                               
577                                                #need to get hold original format XML too (so can put directly into "transformed" docs section...
578                                                self.originalXMLdoc = ''
579                               
580                                                try:
581                                                        logging.info("Extracting original input file into variable for input into original format in transformed docs table")
582                                                        self.originalXMLdoc = file(original_filename).read()
583                                                        self.difXML = self.originalXMLdoc
584                                       
585                                                except:
586                                                        logging.warn("Could not extract original file to local variable!")
587                                                       
588                                               
589                                                #Back to main stream of ISO ingest
590                                                if self._NDG_dataProvider:
591                                       
592                                                        #to ensure "stubISO" doesnt make it into the filename on system and in the db, use the self.
593                                                               
594                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
595                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + format + "__"+ metadataFilename.replace(":", "-")+".xml"
596                                                        new_filename_short = self._datacentre_namespace + "__" + format + "__" + metadataFilename.replace(":", "-")+".xml"
597                               
598                                       
599                                                else:
600                                                        mappingIndex = isoDataModel.findTheListData(isoDataModel.datasetID)
601                                                        ident = isoDataModel.datasetID[mappingIndex][0]
602                                       
603                                                        ident = ident.replace(":", "-")
604                                                        ident = ident.replace("/", "-")
605                               
606                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
607                                                        new_filename_short = self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
608                               
609                                       
610                               
611                                                #now create this stub ISO file on system so can access it
612                                                self.genIsoFile = discovery_dir + new_filename_short   
613                                                               
614                                                #FileUtilities.createFile(self.stubIsoFile, self.isoXML)
615                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
616                                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
617                                               
618                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
619                                                #this links a derived filename in processing dir with original filename
620                                                #get basic filename from the path+filename passed to this function to use as key               
621                                                self.inputFileOrigFinal[new_filename_short]=filename
622                                               
623                                                numfilesproc += 1
624                       
625                       
626                                        elif isoDataModel.createISOdataStructure() is True:
627                                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
628                                                #sys.exit() # TODO fix these
629                                               
630                                        else:
631                                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
632                                                badXMLfiles += 1
633                                                self.badXMLfileList[new_filename_short]=filename
634                                                                       
635                                except:
636                                        logging.error("Could not generate ISO XML from DIF format!!")
637                                        badXMLfiles += 1
638                                        self.badXMLfileList[new_filename_short]=filename
639                                       
640                                self.originalsDiscoveryFilesMap[new_filename_short] = filename
641                               
642                       
643                        #all other ISO formats/profiles to deal with.
644                        else:                           
645                                isoFormat = format
646                               
647                                self.originalXMLdoc = "null"
648                               
649                               
650                                #generate the ISO object from the ISO xml..                                                                             
651                                isoDataModel = ExtractISO(original_filename,self._isoClass[self._datacentre_format])
652                               
653                                if isoDataModel.createISOdataStructure() is True:
654                                                logging.info("ISO extractor worked fine!")
655                                               
656                                                #Back to main stream of ISO ingest
657                                                if self._NDG_dataProvider:
658                                                               
659                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
660                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
661                                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
662                               
663                                                else:
664                                                        mappingIndex = isoDataModel.findTheListData(isoDataModel.datasetID)     
665                                                        ident = isoDataModel.datasetID[mappingIndex][0]
666                                                       
667                                                        ident = ident.replace(":", "-")
668                                                        ident = ident.replace("/", "-")
669                               
670                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
671                                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
672                               
673                                                #now create this stub ISO file on system so can access it
674                                                self.genIsoFile = discovery_dir + new_filename_short   
675                               
676                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
677                                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
678                               
679                                                #get the converted ISO into a local var
680                                                try:
681                                                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
682                                                        self.isoXML = file(self.genIsoFile).read()
683                                                        self.originalXMLdoc = self.isoXML
684                                       
685                                                except:
686                                                        logging.warn("Could not extract converted ISO xml to local variable!")
687                                                        self.isoXML = ''
688                                                       
689                                                logging.info("original file = " + original_filename)
690                                                logging.info("newfile = " + new_filename)
691                               
692                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
693                                                #this links a derived filename in processing dir with original filename
694                                                #get basic filename from the path+filename passed to this function to use as key               
695                                                self.inputFileOrigFinal[new_filename_short]=filename
696                                               
697                                                numfilesproc += 1
698                                                                                               
699                                elif isoDataModel.createISOdataStructure() is True:
700                                        logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
701                                                #sys.exit() # TODO fix these
702                                       
703                                else:                                   
704                                        logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")
705                                        badXMLfiles += 1
706                                        self.badXMLfileList[new_filename_short]=filename
707                               
708                       
709                                self.originalsDiscoveryFilesMap[new_filename_short] = filename
710                       
711                logging.info("File renaming and converting completed")
712                logging.info(self.lineSeparator)
713               
714                       
715                return numfilesproc,badXMLfiles
716       
717       
718        def _getPostgresDBConnection(self):
719                '''
720                Get the default postgres DB connection - by reading in data from the db config file
721                '''
722               
723                logging.debug("Setting up connection to postgres DB")
724                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
725               
726               
727                self.pgc = pgc(configFile = self._databaseConfigurationFile)
728                logging.info("Postgres DB connection now set up")
729               
730               
731       
732        def _getPostgresDBConnectionLogging(self):
733                '''
734                Get the default postgres DB connection - by reading in data from the db config file
735                '''
736               
737                logging.debug("Setting up connection to postgres DB")
738                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
739               
740               
741                self.pgc_IngestLog = pgc(configFile = self.ingestLoggingDatabaseConfigurationFile)
742                logging.info("Postgres DB connection now set up")
743
744
745
746        def     _backupAndCleanData(self):
747                '''
748                Back up ingested data for specified data centre, then clean any of
749                the ingest dirs
750                '''
751                logging.info("Backing up ingested data")
752                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
753                  strftime("%y%m%d_%H%M") + "_originals/"
754               
755                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
756                logging.info("Data backed up - now clearing ingest directories")
757                #Clear out the original harvest records area and discovery dir
758                #FileUtilities.cleanDir(self.originals_dir)
759                #FileUtilities.cleanDir(self.discovery_dir)
760                logging.info("Ingest directories cleared")
761
762
763        def     _setupDataCentreDirs(self):
764                '''
765                Set up directories appropriate for the current data centre
766                '''
767                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
768               
769               
770                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
771               
772                # the following dirs define where the specific documents should go
773                self.originals_dir = data_dir + "/oai/originals/" # copy of original
774                self.discovery_dir = data_dir + "/discovery/"
775                #self.stubISO_dir = data_dir + "/stub_iso/"
776               
777                # Create/clear the 'in' and 'out' directories
778                FileUtilities.setUpDir(self.originals_dir)
779                FileUtilities.setUpDir(self.discovery_dir)
780               
781                logging.info("Ingest directories for data centre set up")
782
783
784        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
785                '''
786                Convert files from originals dir to discovery one, then
787                ingest, backup and clean
788                @param originals_dir: directory to ingest docs from
789                @param discovery_dir: directory to use to process ingested docs
790                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
791                '''
792               
793                self.thisIngestStartDate = str(datetime.datetime.now())
794               
795                numfilesproc,badXmlFiles = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
796               
797                filenames = os.listdir(self.discovery_dir)
798               
799                ingestProbMsg = ""
800                ingestWarnMsg = ""
801                               
802                #generate a list of files ALREADY in database so can compare with what has been ingested
803                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
804                               
805                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
806                filesPresentList=[]
807       
808               
809                if filePresentListArr:
810                        for fileArr in filePresentListArr:
811                                filesPresentList.append(fileArr[0])
812                               
813                                #TODO - is above relevant - duplicating functionlaity?
814       
815                #create list to to hold files ingest failed on.
816                self.updateFailList = []
817                self.deletedFailList = []
818                self.problemMessageList = {}   
819                self.warnMessageList = {}
820               
821                counter = 1
822                warningCounter = 0
823                               
824                for filename in filenames:
825                       
826                        fullPath = self.discovery_dir + filename
827                       
828                        #if process id has been supplied, then update the logging db with a "start_ingest"
829                        if self.ingestProcessID is not None and self.procID is None:
830                                sqlStatusCmd = "select update_ingest_info (%s, ' (ingesting %s of %s)');" %(self.ingestProcessID,counter,numfilesproc)                 
831                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
832                       
833                       
834                        if os.path.isfile(fullPath):
835                                       
836                                thisIngestedID = self.addFileToPostgresDB(fullPath)
837                               
838                                #record all problem messages
839                                #ingestProbMsg = ingestProbMsg + self.ingestProblemMessage + "\n"
840                                if self.ingestProblemMessage != '':
841                                        ingestProbMsg = ingestProbMsg + self.ingestProblemMessage                                       
842                                        self.problemMessageList[self.originalsDiscoveryFilesMap[filename]] = self.ingestProblemMessage
843                                       
844                                if self.ingestWarningMessage != '':
845                                        warningCounter += 1
846                                        ingestWarnMsg = ingestWarnMsg + self.ingestWarningMessage
847                                        self.warnMessageList[filename] = self.ingestWarningMessage
848                                       
849                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
850                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
851                                        if thisIngestedID in filesPresentList: 
852                                                filesPresentList.remove(thisIngestedID)
853                       
854                       
855                        counter += 1   
856               
857                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
858                #will need to be removed.
859               
860                #only do this if not in single file mode (or else wverything else gets deleted!)               
861                if ((self.indFileToIngest == "") & (feed)):
862                        for item in filesPresentList:
863                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
864                                DeleteRecord(item)
865                                self.deletedFailList.append(item)
866                                self._no_files_deleted += 1
867                       
868                       
869                self._backupAndCleanData()
870               
871                #at this stage put the reporting code in (will work for oai or atom feed)
872                #create a summary file for each data centre ingest
873                data_dir = self._base_dir + "data/"
874                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
875                recOpFile = open(recOpFileName,'w')
876               
877                logging.info("oai_document_ingest processing complete:")
878               
879                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
880                self.thisIngestEndDate = str(datetime.datetime.now())
881               
882                messageSeparator = "\n\n********************************************************************************"
883               
884                message = "Ingest report for data centre: " + datacentre + "\n"
885                message = message + "Ingest date: " + self.thisIngestEndDate + "\n"
886                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
887                message = message + "TOTAL_NUMBER_OF_FILES " + str(numfilesproc+badXmlFiles) + "\n"
888                message = message + "TOTAL_PROCESSED_SUCCESFULLY " + str(numfilesproc) + "\n"
889                message = message + "TOTAL_PROCESSED_UNSUCCESFULLY " + str(badXmlFiles) + "\n" 
890                message = message + "INGESTED_Created " + str(self._no_files_ingested)  + "\n"
891                message = message + "INGESTED_Updated " + str(self._no_files_changed)  + "\n"
892                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
893               
894                if warningCounter > 0:
895                        message = message + messageSeparator
896                        message = message + "\nINGEST WARNINGS:  " + str(warningCounter) + "\n"
897                       
898                        message = message +  "\nBelow are the files that had ingest warnings...\n\n"
899                       
900                        for warnFile in self.warnMessageList.keys():
901                                message = message + "\nWARNING_FILE " + self.originalsDiscoveryFilesMap[warnFile] + "    " + self.warnMessageList[warnFile] + "\n"
902                       
903               
904                if self._no_problem_files > 0: 
905                        message = message + messageSeparator
906                        message = message + "\nPROBLEM_FILES: " + str(self._no_problem_files)  + "\n"
907                       
908               
909                if len(self.updateFailList) > 0:
910                        message = message +  "\nBelow are comments recorded on problem files...\n\n"
911                                       
912                        for badFile in self.updateFailList:
913                                #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
914                               
915                                #NOTE: there may not be a problem message for every file that failed to ingest..
916                                if badFile in self.problemMessageList.keys():
917                                        message = message +"\nPROBLEM_FILE " + badFile + "\n"
918                                        message = message + self.problemMessageList[badFile] + "\n"
919                               
920               
921                if len(self.badXMLfileList.keys()) > 0:
922                        message = message + messageSeparator
923                        message = message +  "\nBelow are the files that could not be parsed (i.e. bad XML?)...\n\n"
924                        for badXmlFile in self.badXMLfileList.values():
925                                message = message + "BAD_XML_FILE: " + badXmlFile +"\n"
926               
927                                               
928                #log processing results to ingestLogging table if required to           
929               
930                if self.ingestProcessID is not None:
931                        logging.info("Updating ingest logging database")
932                                               
933                        sqlLoggingCmd = "select add_ingest_stats(%s,'%s','%s','%s','%s','%s','%s','%s','%s','%s');"%(self.ingestProcessID,self.thisIngestStartDate,self.thisIngestEndDate,numfilesproc+badXmlFiles,numfilesproc,badXmlFiles,self._no_files_ingested,self._no_files_changed,self._no_files_deleted,self._no_problem_files)
934                                       
935                        #update ingestLogging                   
936                        self.pgc_IngestLog.runSQLCommand(sqlLoggingCmd)
937                       
938                        #now update problem file log if there are problem files
939                        if self._no_problem_files > 0:
940                                logging.info("There are %s problem files - recording these in stats logging db" %self._no_problem_files)
941                               
942                                #note sometimes message will not always be recorded, so just record name of file
943                                if len(self.problemMessageList.keys()) < 1:
944                                       
945                                        for badFile in self.updateFailList:
946                                                sqlProbFileCmd = "select add_problem_file(%s,'%s','%s');"%(self.ingestProcessID,badFile,'Could not catch precise error for this file!!')
947                               
948                                                #update ingestLogging           
949                                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)
950                               
951                                else:
952                                        for badFile in self.problemMessageList.keys():                                 
953                                                sqlProbFileCmd = "select add_problem_file(%s,'%s','%s');"%(self.ingestProcessID,badFile,self.problemMessageList[badFile])
954                                                       
955                                                #update ingestLogging                   
956                                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)               
957                               
958                                       
959                        #if process id has been supplied, then update the logging db with a "start_ingest" (only for DPWS ops)
960                        if self.procID is None:
961                                sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
962                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
963                       
964                #update with content from the problem messages recorded
965                #message = message + ingestProbMsg + "\n"               
966                         
967                recOpFile.write(message)
968                               
969                return numfilesproc,badXmlFiles, message
970
971
972
973        def _setupXQueries(self):
974                '''
975                Set up the required XQueries
976                - NB, extract the xquery libraries locally for easy reference
977                '''
978                self._xq = ndgResources()
979                for libFile in self._xq.xqlib:
980                        # NB, we don't want the full path to the files - just the filename
981                        fileName = libFile.split('/')[-1]
982                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.