source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 7857

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@7857
Revision 7857, 39.4 KB checked in by sdonegan, 10 years ago (diff)

Updated to take data centre configuration settings from dpws db when a dpws activated ingest is started

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24
25class AbstractDocumentIngester(object):
26        '''
27        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
28        - including running the various transforms and parsings to get all doc types and spatiotemporal
29        data in the correct form in the DB
30        '''
31        lineSeparator = "------------------------------"
32               
33        # The directory to put things for a tape backup (should already exist)
34        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
35       
36        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
37               
38        def _setupCmdLineOptions(self):
39                '''
40                Determine the logging level to use and configure this appropriately
41                @return args: any input arguments - excluding options
42                '''
43               
44                # check for verbose option
45                try:
46                        opts, args = getopt.getopt(sys.argv[1:], "vd")
47                except getopt.GetoptError, err:
48                    # print help information and exit:
49                    print str(err) # will print something like "option -a not recognized"
50                    sys.exit(2)
51                   
52                if len(args) < 1:
53                        self.usage()
54                   
55                loggingLevel = logging.ERROR
56                for o, a in opts:
57                    if o == "-v":
58                        print " - Verbose mode ON"
59                        loggingLevel = logging.INFO
60                    elif o == "-d":
61                        print " - Debug mode ON"
62                        loggingLevel = logging.DEBUG
63                   
64               
65                # set up any keywords on the object
66                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
67                checkConfInArgs = False         
68                checkProcID = False
69                for arg in args:
70                       
71                        keywordArgs=['ingestFromDate','individualFile','interval','ingestFromDate',\
72                                                'eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile', \
73                                                'ingestProcessID']
74                                               
75                       
76                        bits = arg.split('=')
77                        if len(bits) == 2:
78                                if bits[0] == keywordArgs[3]:
79                                        self.setIngestFromDate(bits[1])
80                                elif bits[0] == keywordArgs[2]:
81                                        self.setPollInterval(bits[1])
82                                elif bits[0] == keywordArgs[7]:
83                                        print " - Running with specified config file!"
84                                        self.setOaiConfigFile(bits[1])
85                                        checkConfInArgs = True
86                                elif bits[0] == keywordArgs[1]:
87                                        print " - Running in single file ingestion mode!"
88                                        self.setIndFileToIngest(bits[1])
89                                elif bits[0] == keywordArgs[8]:
90                                        print " - WARNING: will log results of this ingestion process into the specified logging database! (NOTE: will also take location of metadata from database as this option intended for DPWS!!)"
91                                        self.setIngestLogID(bits[1])
92                                        checkProcID = True                             
93                                elif bits[0] not in keywordArgs:
94                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
95                                        sys.exit(2)
96                                else:
97                                        setattr(self, bits[0], bits[1])
98                       
99                if checkConfInArgs is False:
100                        print "\nWARNING: Ingest config file has not been specified. Using local default"
101                        self.setOaiConfigFile('./oai_document_ingester.config')
102                       
103               
104               
105                print self.lineSeparator
106               
107                # NB, this is a slight fudge as cannot get the detailed logging to work
108                # without setting up a new logger here - which means we get two loggers
109                # outputing data. The initial call to logging needs to be tracked down
110                # and configured correctly, so this can be avoided...
111#               self.logger = logging.getLogger()
112#               self.logger.setLevel(loggingLevel)
113
114                # create console handler and set level to debug
115#               ch = logging.StreamHandler()
116#               ch.setLevel(loggingLevel)
117                # create formatter
118#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
119                # add formatter to ch
120#               ch.setFormatter(formatter)
121                # add ch to logger
122#               self.logger.addHandler(ch)
123               
124                return args
125
126
127        def getID(self, filename):
128                '''
129                Gets the identifier out of an input metadata xml record.
130                @param filename - name of document file being processed
131                @return: ID - id to use to refer to the document
132                '''
133                logging.info("Retrieving identifier for metadata record " + filename)
134       
135                xml=file(filename).read()
136               
137                ID = idget(xml)
138               
139                return ID
140       
141       
142        def addFileToPostgresDB(self, filename):
143                '''
144                Add a file to the postgres DB - extracting and storing all the required
145                data in the process
146                @param filename: full path of file to add to postgres DB
147                '''
148                logging.info("Adding file, " + filename + ", to postgres DB")
149               
150                numSlash = len(filename.split('/'))
151                               
152                shortFilename = filename.split('/')[numSlash - 1]
153               
154                self.recordAttemptToIngestDiscoveryID = ""
155                self.ingestWarningMessage = ""
156               
157                self.ingestProblemMessage = ""
158               
159                #need to generate a local version of the iso data model (first call was to run through all available files
160                #and convert & place into relevant dirs as necessary
161                #generate the ISO object from the converted ISO here now..
162               
163                try:
164                                                                                                                                       
165                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
166                       
167                                                       
168                        if self.isoDataModel.createISOdataStructure() is True:
169                                logging.info("ISO extractor worked fine!")
170                       
171                       
172                        elif self.isoDataModel.createISOdataStructure() is True:
173                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
174                                                #sys.exit() # TODO fix these
175                                return
176                        else:
177                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
178                                                #sys.exit()
179                                return
180                except:
181                        logging.error("Could not generate ISO XML!")
182                        return
183               
184                #get the converted ISO into a local var
185                try:
186                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")                   
187                        self.isoXML = file(filename).read()
188                        self.originalXMLdoc = self.isoXML
189                                       
190                except:
191                        logging.warn("Could not extract converted ISO xml to local variable!")
192                        self.isoXML = ''
193                               
194               
195                # first of all create a PostgresRecord - this object represents all the data required
196                # for a DB entry
197                dao = None
198               
199                try:
200                       
201                        #if record is valid (vital elements checked in ExtractISO - validDoc is True if everything ok..
202                       
203                        #record whats bout to be ingested - if a problem this, error should already have been kicked out
204                        mappingIndex = self.isoDataModel.findTheListData(self.isoDataModel.datasetID)                                   
205                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[mappingIndex][0]                   
206                       
207                        if self.isoDataModel.validDoc is True:                 
208                                                               
209                                #record whats attempting to be ingested
210                                record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
211                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
212                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, self._currentMedinStandard, stubIso = self.isoXML, difXML = self.difXML, isoXML = self.isoXML)                   
213                               
214                                # Now create the data access object to interface to the DB                     
215                                dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc, dpwsID = self._DPWS_ID )
216                       
217                                # Finally, write the new record
218                                # 0 if failed, 1 if update, 2 if create                         
219                                returnCode = dao.createOrUpdateRecord()
220                                                               
221                                if dao.processingWarning != '':
222                                        self.ingestWarningMessage = dao.processingWarning
223                                                                               
224                                if returnCode == 2:
225                                        self._no_files_ingested += 1
226                               
227                                elif returnCode == 1:
228                                        self._no_files_changed += 1
229                                       
230                        else:
231                                self.ingestProblemMessage= "Content validity problems for record: %s (%s)" %(self.recordAttemptToIngestDiscoveryID,self.isoDataModel.processingMsg)
232                               
233                                logging.warn(self.ingestProblemMessage)
234                               
235                                #now raise error so can record this
236                                raise ValueError,self.ingestProblemMessage
237                       
238                except:
239                       
240                       
241                        #if error encountered, add to failure lisr
242                        logging.error("Could not update: " + filename)                 
243                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
244                        self.updateFailList.append(originalRecordFilename)
245                       
246                        logging.error("Exception thrown - detail: ")
247                        errors = sys.exc_info()
248                        logging.error(errors)
249                        self._error_messages += "%s\n" %str(errors[1])
250                       
251                        if dao:
252                                logging.info("Removing record and its associated info from DB")
253                                logging.info("- to allow clean ingestion on rerun")
254                                try:
255                                        dao.deleteOriginalRecord()
256                                except:
257                                        logging.error("Problem encountered when removing record: ")
258                                        logging.error(sys.exc_info())
259                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
260
261                        self._no_problem_files += 1
262                       
263                        logging.info("Continue processing other files")
264                       
265                return self.recordAttemptToIngestDiscoveryID
266                       
267       
268        def getProcessingConfig(self,configFilePath):
269
270                # Check this file exists; if not, assume an invalid datacentre has been specified               
271                if not os.path.isfile(configFilePath):
272                    sys.exit("ERROR: Could not find the processing config file")
273                   
274                processingConfig = {}
275                   
276                processing_config_file = open(configFilePath, "r")
277               
278                for line in processing_config_file.readlines():
279                        words  = string.split(line)
280                        if len(words) == 0:
281                                continue
282                        elif words[0] == 'code_directory':
283                                processingConfig['code_directory'] = words[1]
284                                                                                               
285                        elif words[0] == 'base_directory':
286                                processingConfig['base_directory'] = words[1]
287                               
288                        elif words[0] == 'reporting_directory':
289                                processingConfig['reporting_directory'] = words[1]
290                               
291                        elif words[0] == 'passwords_file':
292                                processingConfig['passwords_file'] = words[1]
293                               
294                        elif words[0] == 'datcentre_configs':
295                                processingConfig['datcentre_configs'] = words[1]
296                                       
297                        elif words[0] == 'changeURLs':
298                                processingConfig['changeURLs'] = words[1]
299                               
300                        elif words[0] == 'NDG_redirect_URL':
301                                processingConfig['NDG_redirect_URL'] = words[1]
302                               
303                        elif words[0] == 'ingestLoggingConfig':
304                                processingConfig['ingestLoggingConfig'] = words[1]
305                               
306                        elif words[0] == 'ingestConfig':
307                                if not os.path.isfile(words[1]):
308                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
309                                else:
310                                        processingConfig['ingestConfig'] = words[1]
311                               
312                        elif words[0] == 'acceptedFormats':
313                                processingConfig['acceptedFormats'] = words[1]
314                               
315                        elif words[0] == 'saxonJarFile':
316                                if not os.path.isfile(words[1]):
317                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
318                                else:
319                                        processingConfig['saxonJarFile'] = words[1]
320                        elif words[0] == 'xpathIsoClass':
321                               
322                                #build a dictionary with format as the key                             
323                                isoClass = words[1].split(":")                         
324                               
325                                if 'xpathIsoClass' not in processingConfig.keys():
326                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
327                                else:
328                                        temp = processingConfig['xpathIsoClass']                                       
329                                        temp.update({isoClass[0]:isoClass[1]})
330                                        processingConfig['xpathIsoClass'] = temp
331                               
332                        elif words[0] == 'xqueryStubIsoGen':
333                               
334                                #build a dictionary with format as the key
335                                isoClass = words[1].split(":")
336                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
337                               
338                        elif words[0] == 'xqueryConversions':
339                               
340                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
341                                processingConfig['xqueryConversions'] = words[1].split(",")
342                               
343                        elif words[0] == 'currentMEDIN':
344                               
345                                #define the current MEDIN standard so can cast conversions as this (at some stage we will ingest a few different versions)
346                                processingConfig['currentMEDIN'] = words[1]
347                               
348                        elif words[0] == 'exceptXqConv':
349                               
350                                #build a dictionary with format as the key
351                                xqClass = words[1].split(":")
352                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
353                               
354                        elif words[0] == 'xqDocTypes':
355                               
356                                #build a dictionary with format as the key
357                                xqClass = words[1].split(":")
358                               
359                                if 'xqDocTypes' not in processingConfig.keys():
360                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
361                                else:
362                                        temp = processingConfig['xqDocTypes']                                   
363                                        temp.update({xqClass[0]:xqClass[1]})
364                                        processingConfig['xqDocTypes'] = temp
365                                       
366                processing_config_file.close()
367                       
368                return processingConfig
369       
370       
371        def getConfigDetails(self, datacentre, dpwsID = None):
372                '''
373                Determine whether config details are to be picked up from local config file (for backwards compatibility)
374                or from the DPWS db and set the config params as appropriate
375                @param datacentre: datacentre to use when looking up config file
376                @param dpwsFormat: value extracted from dpws db for data format
377                @param dpwsConfig: resultset containing info from dpws provider_common table           
378                '''             
379                if dpwsID is not None:
380                        self.getConfigDetailsFromDPWS(dpwsID)
381                else:
382                        self.getConfigDetailsFromFile(datacentre)
383                       
384               
385        def getConfigDetailsFromDPWS(self,dpwsID):
386                '''
387                Get the harvested records directory and groups for this datacentre from the
388                datacentre specific DPWS db based on the id.  The harvested records directory depends on the
389                datacentres OAI base url, the set and format. These have to be know up-front.
390                The groups denote which 'portal groups' they belong to - for limiting searches to
391                say NERC-only datacentres records.
392                Groups are added to the intermediate MOLES when it is created.
393                @param datacentre: datacentre to use when looking up config file
394                '''
395               
396                #to start with just use a simple sql cmd until new db is ready and sql function installed               
397                self._DPWS_ID = str(dpwsID)
398               
399                #relic to keep rest of ingester data model happy
400                self._datacentre_groups = ""
401               
402                if self._DPWS_ID == "":
403                        #sys.exit("No DPWS ID entered for this provider!")
404                        raise ValueError("No DPWS ID entered for this provider!")
405                               
406                #now set the relevant variables         
407                self._datacentre_format = ""
408                try:
409                        #need to also get format held in a different table
410                        sqlCmd = "select format from oai_provider where common_id = '%s';" %self._DPWS_ID               
411                        self._datacentre_format = self.pgc_IngestLog.runSQLCommand(sqlCmd)[0][0]
412                        logging.info("format being harvested: " + self._datacentre_format)             
413                       
414                except:
415                        msg = "Could not extract metadata format from DPWS DB!"
416                        logging.error(msg)
417                        raise ValueError(msg)
418                       
419                self._datacentre_namespace = ""
420                self._NDG_dataProvider = False
421                self._PublishToCSW = False
422                self._AggregateDir = ""
423               
424                try:
425                        sqlCmd = "select csw_publish_dir,ndg_data_provider,name,data_marker,csw_publish from provider_common where id='%s';" %self._DPWS_ID
426                        configDetails = self.pgc_IngestLog.runSQLCommand(sqlCmd)
427                       
428                        self._datacentre_namespace = configDetails[0][3]
429                        logging.info("datacentre namespace: " + self._datacentre_namespace)             
430                       
431                        if configDetails[0][1] == 'True':
432                                self._NDG_dataProvider = True
433                                logging.info("Datacentre classified as an NDG data provider")
434                        else:
435                                logging.info("Datacentre is not classified as an NDG data provider")           
436                               
437                        if configDetails[0][4] == 'True':
438                                self._PublishToCSW = True
439                                self._AggregateDir = configDetails[0][0]
440                       
441                except:
442                        msg = "Could not extract configuration details from DPWS DB for provide id %s!" %self._DPWS_ID
443                        logging.error(msg)
444                        raise ValueError(msg)
445                               
446                logging.info(self.lineSeparator)
447                       
448               
449        def getConfigDetailsFromFile (self, datacentre):
450                '''
451                Get the harvested records directory and groups for this datacentre from the
452                datacentre specific config file.  The harvested records directory depends on the
453                datacentres OAI base url, the set and format. These have to be know up-front.
454                The groups denote which 'portal groups' they belong to - for limiting searches to
455                say NERC-only datacentres records.
456                Groups are added to the intermediate MOLES when it is created.
457                @param datacentre: datacentre to use when looking up config file
458                '''
459                # initialise the variables to retrieve from the config file
460                self._harvest_home = ""
461                self._datacentre_groups = ""
462                self._datacentre_format = ""
463                self._datacentre_namespace = ""
464                self._NDG_dataProvider = False
465                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
466               
467               
468                #note changed to production buildout path
469                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
470               
471                #fed up with this rubbish causing problems - use a normal file opener.
472                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
473               
474               
475                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
476                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
477               
478                # Check this file exists; if not, assume an invalid datacentre has been specified
479                if not os.path.isfile(self._datacentre_config_filename):
480                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
481                        "specified (%s) is invalid\n" %datacentre)
482                   
483               
484                datacentre_config_file = open(self._datacentre_config_filename, "r")
485               
486                for line in datacentre_config_file.readlines():
487                        words  = string.split(line)
488                        if len(words) == 0:
489                                continue
490                        elif words[0] == 'host_path':
491                                self._harvest_home = words[1].rstrip()
492                        elif words[0] == 'groups':
493                                self._datacentre_groups = words[1:]
494                        elif words[0] == 'format':
495                                self._datacentre_format = words[1]
496                        elif words[0] == 'namespace':
497                                self._datacentre_namespace = words[1]
498                        elif words[0] == 'NDG_dataProvider':
499                                self._NDG_dataProvider = True
500                        elif words[0] == 'feed':
501                                self._IngestViaFeed = True
502                        elif words[0] == 'dpws_id':
503                                self._DPWS_ID = words[1]
504                        elif words[0] == 'csw_publish':
505                                self._PublishToCSW = words[1]
506                        elif words[0] == 'csw_publish_dir':
507                                self._AggregateDir = words[1]
508                       
509                datacentre_config_file.close()
510               
511                #stop ingest on this datacentre if process thread is OAI and the config says FEED
512                if ((self.processThread == 'OAI') and self._IngestViaFeed):
513                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
514                                               
515                if self._harvest_home == "":
516                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
517               
518                logging.info("harvested records are in " + self._harvest_home)
519               
520                if self._datacentre_groups == "":
521                    logging.info("No groups/keywords set for datacentre " + datacentre)
522                else:
523                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
524               
525                if self._datacentre_format == "":
526                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
527               
528                logging.info("format being harvested: " + self._datacentre_format)
529               
530                if self._datacentre_namespace == "":
531                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
532               
533                logging.info("datacentre namespace: " + self._datacentre_namespace)
534               
535                if self._DPWS_ID == "":
536                        sys.exit("No DPWS ID entered for this provider!")
537               
538                if self._NDG_dataProvider:
539                        logging.info("Datacentre classified as an NDG data provider")
540                else:
541                        logging.info("Datacentre is not classified as an NDG data provider")
542                logging.info(self.lineSeparator)
543               
544               
545        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
546                '''
547                Processes/renames the files (changed 08/01/07 to get id from inside file)
548                 - also replace any namespace declarations with a standard one which we know works in NDG
549                 NB, this copies files from the original dir to the discovery dir
550                 @param originals_dir: directory to convert files from
551                 @param discovery_dir: directory in which converted files will end up
552                 @return numfilesproc: counter of number of files processed
553                '''
554                numfilesproc = 0
555                badXMLfiles = 0         
556               
557                self.inputFileOrigFinal = {}
558                self.badXMLfileList = {}               
559                self.difXML = None
560               
561                #create dictionary for a mapping between original files and converted discovery files
562                self.originalsDiscoveryFilesMap = {}
563               
564                logging.info(self.lineSeparator)
565                logging.info("Renaming files:")
566               
567                if self.ingestProcessID is not None and self.procID is None:
568                       
569                        self._getPostgresDBConnectionLogging()
570                                               
571                        #if process id has been supplied, then update the logging db with a "start_ingest"
572                        sqlStatusCmd = "select update_ingest_status (%s, 'run_ingest');" %self.ingestProcessID
573                        self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
574                       
575                        sqlStatusCmd = "select update_ingest_info (%s, ' (preprocessing...)');" %(self.ingestProcessID)
576                        self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
577               
578                for filename in os.listdir(originals_dir):
579                       
580                        if not filename.endswith('.xml'):
581                                logging.warning('File %s is not xml format. Not processed'  %(filename))
582                                continue
583                       
584                        original_filename = originals_dir + filename
585                       
586                        #get the name of the file to be used in the xquery
587                        metadataFilename = filename.replace('.xml', '')
588                               
589                       
590                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
591                        if format == 'DIF_9.4':
592                                                       
593                                logging.info("Converting DIF to stub ISO...")                                   
594                               
595                                #if DIF still need to convert URL's using the old method so when requesting DIF in original format we still have correct redirect urls....
596                                #redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self._ndgRedirectURL)
597                                                                                                               
598                                #gets the metadata id from the xml
599                                metadataID=self.getID(original_filename)
600                                #import pdb
601                                #pdb.set_trace()
602                                                               
603                                repositoryName = self._datacentre_namespace
604                               
605                                                               
606                                #where is the file to be ingested located?
607                                metadataFileLoc = originals_dir
608                               
609                                #generate a new stubISO filename
610                                self.isoFormat= "stubISO"
611                                                               
612                               
613                                #self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename)
614                                self.xqueryTransformation = xqueryTransformation(metadataFileLoc,repositoryName,metadataID,metadataFilename,self._saxonJarFile)
615                                                                                                                       
616                                #get hold of the xml for the transformed ISO                           
617                                self.isoXML = self.xqueryTransformation.runXquery(self._dif2isoXquery)
618                                                               
619                                #create the iso file on the system
620                                tempStubISOfile = original_filename + ".stubISO.xml"
621                                FileUtilities.createFile(tempStubISOfile , self.isoXML)
622                               
623                                #generate the ISO object from the converted ISO here now..
624                                try:
625                                       
626                                        #temp assignment so failure mode supported
627                                        new_filename_short = original_filename
628                                                                       
629                                        isoDataModel = ExtractISO(tempStubISOfile,self._isoClass[self._datacentre_format])
630                               
631                                        if isoDataModel.createISOdataStructure() is True:
632                                                logging.info("ISO extractor worked fine!")
633                                               
634                                                #need to get hold original format XML too (so can put directly into "transformed" docs section...
635                                                self.originalXMLdoc = ''
636                               
637                                                try:
638                                                        logging.info("Extracting original input file into variable for input into original format in transformed docs table")
639                                                        self.originalXMLdoc = file(original_filename).read()
640                                                        self.difXML = self.originalXMLdoc
641                                       
642                                                except:
643                                                        logging.warn("Could not extract original file to local variable!")
644                                                       
645                                               
646                                                #Back to main stream of ISO ingest
647                                                if self._NDG_dataProvider:
648                                       
649                                                        #to ensure "stubISO" doesnt make it into the filename on system and in the db, use the self.
650                                                               
651                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
652                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + format + "__"+ metadataFilename.replace(":", "-")+".xml"
653                                                        new_filename_short = self._datacentre_namespace + "__" + format + "__" + metadataFilename.replace(":", "-")+".xml"
654                               
655                                       
656                                                else:
657                                                        mappingIndex = isoDataModel.findTheListData(isoDataModel.datasetID)
658                                                        ident = isoDataModel.datasetID[mappingIndex][0]
659                                       
660                                                        ident = ident.replace(":", "-")
661                                                        ident = ident.replace("/", "-")
662                               
663                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
664                                                        new_filename_short = self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
665                               
666                                       
667                               
668                                                #now create this stub ISO file on system so can access it
669                                                self.genIsoFile = discovery_dir + new_filename_short   
670                                                               
671                                                #FileUtilities.createFile(self.stubIsoFile, self.isoXML)
672                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
673                                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
674                                               
675                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
676                                                #this links a derived filename in processing dir with original filename
677                                                #get basic filename from the path+filename passed to this function to use as key               
678                                                self.inputFileOrigFinal[new_filename_short]=filename
679                                               
680                                                numfilesproc += 1
681                       
682                       
683                                        elif isoDataModel.createISOdataStructure() is True:
684                                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
685                                                #sys.exit() # TODO fix these
686                                               
687                                        else:
688                                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
689                                                badXMLfiles += 1
690                                                self.badXMLfileList[new_filename_short]=filename
691                                                                       
692                                except:
693                                        logging.error("Could not generate ISO XML from DIF format!!")
694                                        badXMLfiles += 1
695                                        self.badXMLfileList[new_filename_short]=filename
696                                       
697                                self.originalsDiscoveryFilesMap[new_filename_short] = filename
698                               
699                       
700                        #all other ISO formats/profiles to deal with.
701                        else:                           
702                                isoFormat = format
703                               
704                                self.originalXMLdoc = "null"
705                               
706                               
707                                #generate the ISO object from the ISO xml..                                                                             
708                                isoDataModel = ExtractISO(original_filename,self._isoClass[self._datacentre_format])
709                               
710                                if isoDataModel.createISOdataStructure() is True:
711                                                logging.info("ISO extractor worked fine!")
712                                               
713                                                #Back to main stream of ISO ingest
714                                                if self._NDG_dataProvider:
715                                                               
716                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
717                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
718                                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
719                               
720                                                else:
721                                                        mappingIndex = isoDataModel.findTheListData(isoDataModel.datasetID)     
722                                                        ident = isoDataModel.datasetID[mappingIndex][0]
723                                                       
724                                                        ident = ident.replace(":", "-")
725                                                        ident = ident.replace("/", "-")
726                               
727                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
728                                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
729                               
730                                                #now create this stub ISO file on system so can access it
731                                                self.genIsoFile = discovery_dir + new_filename_short   
732                               
733                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
734                                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
735                               
736                                                #get the converted ISO into a local var
737                                                try:
738                                                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
739                                                        self.isoXML = file(self.genIsoFile).read()
740                                                        self.originalXMLdoc = self.isoXML
741                                       
742                                                except:
743                                                        logging.warn("Could not extract converted ISO xml to local variable!")
744                                                        self.isoXML = ''
745                                                       
746                                                logging.info("original file = " + original_filename)
747                                                logging.info("newfile = " + new_filename)
748                               
749                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
750                                                #this links a derived filename in processing dir with original filename
751                                                #get basic filename from the path+filename passed to this function to use as key               
752                                                self.inputFileOrigFinal[new_filename_short]=filename
753                                               
754                                                numfilesproc += 1
755                                                                                               
756                                elif isoDataModel.createISOdataStructure() is True:
757                                        logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
758                                                #sys.exit() # TODO fix these
759                                       
760                                else:                                   
761                                        logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")
762                                        badXMLfiles += 1
763                                        self.badXMLfileList[new_filename_short]=filename
764                               
765                       
766                                self.originalsDiscoveryFilesMap[new_filename_short] = filename
767                       
768                logging.info("File renaming and converting completed")
769                logging.info(self.lineSeparator)
770               
771                       
772                return numfilesproc,badXMLfiles
773       
774       
775        def _getPostgresDBConnection(self):
776                '''
777                Get the default postgres DB connection - by reading in data from the db config file
778                '''
779               
780                logging.debug("Setting up connection to postgres DB")
781                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
782               
783               
784                self.pgc = pgc(configFile = self._databaseConfigurationFile)
785                logging.info("Postgres DB connection now set up")
786               
787               
788       
789        def _getPostgresDBConnectionLogging(self):
790                '''
791                Get the default postgres DB connection - by reading in data from the db config file
792                '''
793               
794                logging.debug("Setting up connection to postgres DB")
795                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
796               
797               
798                self.pgc_IngestLog = pgc(configFile = self.ingestLoggingDatabaseConfigurationFile)
799                logging.info("Postgres DB connection now set up")
800
801
802
803        def     _backupAndCleanData(self):
804                '''
805                Back up ingested data for specified data centre, then clean any of
806                the ingest dirs
807                '''
808                logging.info("Backing up ingested data")
809                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
810                  strftime("%y%m%d_%H%M") + "_originals/"
811               
812                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
813                logging.info("Data backed up - now clearing ingest directories")
814                #Clear out the original harvest records area and discovery dir
815                #FileUtilities.cleanDir(self.originals_dir)
816                #FileUtilities.cleanDir(self.discovery_dir)
817                logging.info("Ingest directories cleared")
818
819
820        def     _setupDataCentreDirs(self):
821                '''
822                Set up directories appropriate for the current data centre
823                '''
824                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
825               
826               
827                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
828               
829                # the following dirs define where the specific documents should go
830                self.originals_dir = data_dir + "/oai/originals/" # copy of original
831                self.discovery_dir = data_dir + "/discovery/"
832                #self.stubISO_dir = data_dir + "/stub_iso/"
833               
834                # Create/clear the 'in' and 'out' directories
835                FileUtilities.setUpDir(self.originals_dir)
836                FileUtilities.setUpDir(self.discovery_dir)
837               
838                logging.info("Ingest directories for data centre set up")
839
840
841        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
842                '''
843                Convert files from originals dir to discovery one, then
844                ingest, backup and clean
845                @param originals_dir: directory to ingest docs from
846                @param discovery_dir: directory to use to process ingested docs
847                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
848                '''
849               
850                self.thisIngestStartDate = str(datetime.datetime.now())
851               
852                numfilesproc,badXmlFiles = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
853               
854                filenames = os.listdir(self.discovery_dir)
855               
856                ingestProbMsg = ""
857                ingestWarnMsg = ""
858                               
859                #generate a list of files ALREADY in database so can compare with what has been ingested
860                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
861                               
862                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
863                filesPresentList=[]
864       
865               
866                if filePresentListArr:
867                        for fileArr in filePresentListArr:
868                                filesPresentList.append(fileArr[0])
869                               
870                                #TODO - is above relevant - duplicating functionlaity?
871       
872                #create list to to hold files ingest failed on.
873                self.updateFailList = []
874                self.deletedFailList = []
875                self.problemMessageList = {}   
876                self.warnMessageList = {}
877               
878                counter = 1
879                warningCounter = 0
880                               
881                for filename in filenames:
882                       
883                        fullPath = self.discovery_dir + filename
884                       
885                        #if process id has been supplied, then update the logging db with a "start_ingest"
886                        if self.ingestProcessID is not None and self.procID is None:
887                                sqlStatusCmd = "select update_ingest_info (%s, ' (ingesting %s of %s)');" %(self.ingestProcessID,counter,numfilesproc)                 
888                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
889                       
890                       
891                        if os.path.isfile(fullPath):
892                                       
893                                thisIngestedID = self.addFileToPostgresDB(fullPath)
894                               
895                                #record all problem messages
896                                #ingestProbMsg = ingestProbMsg + self.ingestProblemMessage + "\n"
897                                if self.ingestProblemMessage != '':
898                                        ingestProbMsg = ingestProbMsg + self.ingestProblemMessage                                       
899                                        self.problemMessageList[self.originalsDiscoveryFilesMap[filename]] = self.ingestProblemMessage
900                                       
901                                else:
902                                       
903                                        #at this point, check to see if record can be aggregated for publishing to CSW?
904                                        if self._PublishToCSW == "True":
905                                               
906                                                #copy file to specified aggregation directory
907                                                FileUtilities.copyFileToDir(fullPath, self._AggregateDir)
908                                       
909                                if self.ingestWarningMessage != '':
910                                        warningCounter += 1
911                                        ingestWarnMsg = ingestWarnMsg + self.ingestWarningMessage
912                                        self.warnMessageList[filename] = self.ingestWarningMessage
913                                       
914                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
915                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
916                                        if thisIngestedID in filesPresentList: 
917                                                filesPresentList.remove(thisIngestedID)
918                       
919                       
920                        counter += 1   
921               
922                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
923                #will need to be removed.
924               
925                #only do this if not in single file mode (or else wverything else gets deleted!)               
926                if ((self.indFileToIngest == "") & (feed)):
927                        for item in filesPresentList:
928                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
929                                DeleteRecord(item)
930                                self.deletedFailList.append(item)
931                                self._no_files_deleted += 1
932                       
933                       
934                self._backupAndCleanData()
935               
936                #at this stage put the reporting code in (will work for oai or atom feed)
937                #create a summary file for each data centre ingest
938                data_dir = self._base_dir + "data/"
939                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
940                recOpFile = open(recOpFileName,'w')
941               
942                logging.info("oai_document_ingest processing complete:")
943               
944                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
945                self.thisIngestEndDate = str(datetime.datetime.now())
946               
947                messageSeparator = "\n\n********************************************************************************"
948               
949                message = "Ingest report for data centre: " + datacentre + "\n"
950                message = message + "Ingest date: " + self.thisIngestEndDate + "\n"
951                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
952                message = message + "TOTAL_NUMBER_OF_FILES " + str(numfilesproc+badXmlFiles) + "\n"
953                message = message + "TOTAL_PROCESSED_SUCCESFULLY " + str(numfilesproc) + "\n"
954                message = message + "TOTAL_PROCESSED_UNSUCCESFULLY " + str(badXmlFiles) + "\n" 
955                message = message + "INGESTED_Created " + str(self._no_files_ingested)  + "\n"
956                message = message + "INGESTED_Updated " + str(self._no_files_changed)  + "\n"
957                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
958               
959                if warningCounter > 0:
960                        message = message + messageSeparator
961                        message = message + "\nINGEST WARNINGS:  " + str(warningCounter) + "\n"
962                       
963                        message = message +  "\nBelow are the files that had ingest warnings...\n\n"
964                       
965                        for warnFile in self.warnMessageList.keys():
966                                message = message + "\nWARNING_FILE " + self.originalsDiscoveryFilesMap[warnFile] + "    " + self.warnMessageList[warnFile] + "\n"
967                       
968               
969                if self._no_problem_files > 0: 
970                        message = message + messageSeparator
971                        message = message + "\nPROBLEM_FILES: " + str(self._no_problem_files)  + "\n"
972                       
973               
974                if len(self.updateFailList) > 0:
975                        message = message +  "\nBelow are comments recorded on problem files...\n\n"
976                                       
977                        for badFile in self.updateFailList:
978                                #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
979                               
980                                #NOTE: there may not be a problem message for every file that failed to ingest..
981                                if badFile in self.problemMessageList.keys():
982                                        message = message +"\nPROBLEM_FILE " + badFile + "\n"
983                                        message = message + self.problemMessageList[badFile] + "\n"
984                               
985               
986                if len(self.badXMLfileList.keys()) > 0:
987                        message = message + messageSeparator
988                        message = message +  "\nBelow are the files that could not be parsed (i.e. bad XML?)...\n\n"
989                        for badXmlFile in self.badXMLfileList.values():
990                                message = message + "BAD_XML_FILE: " + badXmlFile +"\n"
991               
992                                               
993                #log processing results to ingestLogging table if required to           
994               
995                if self.ingestProcessID is not None:
996                        logging.info("Updating ingest logging database")
997                                               
998                        sqlLoggingCmd = "select add_ingest_stats(%s,'%s','%s','%s','%s','%s','%s','%s','%s','%s');"%(self.ingestProcessID,self.thisIngestStartDate,self.thisIngestEndDate,numfilesproc+badXmlFiles,numfilesproc,badXmlFiles,self._no_files_ingested,self._no_files_changed,self._no_files_deleted,self._no_problem_files)
999                                       
1000                        #update ingestLogging                   
1001                        self.pgc_IngestLog.runSQLCommand(sqlLoggingCmd)
1002                       
1003                        #now update problem file log if there are problem files
1004                        if self._no_problem_files > 0:
1005                                logging.info("There are %s problem files - recording these in stats logging db" %self._no_problem_files)
1006                               
1007                                #note sometimes message will not always be recorded, so just record name of file
1008                                if len(self.problemMessageList.keys()) < 1:
1009                                       
1010                                        for badFile in self.updateFailList:
1011                                                sqlProbFileCmd = "select add_problem_file(%s,'%s','%s');"%(self.ingestProcessID,badFile,'Could not catch precise error for this file!!')
1012                               
1013                                                #update ingestLogging           
1014                                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)
1015                               
1016                                else:
1017                                        for badFile in self.problemMessageList.keys():                                 
1018                                                sqlProbFileCmd = "select add_problem_file(%s,'%s','%s');"%(self.ingestProcessID,badFile,self.problemMessageList[badFile])
1019                                                       
1020                                                #update ingestLogging                   
1021                                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)               
1022                               
1023                                       
1024                        #if process id has been supplied, then update the logging db with a "start_ingest" (only for DPWS ops)
1025                        if self.procID is None:
1026                                sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
1027                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
1028                       
1029                #update with content from the problem messages recorded
1030                #message = message + ingestProbMsg + "\n"               
1031                         
1032                recOpFile.write(message)
1033                               
1034                return numfilesproc,badXmlFiles, message
1035
1036
1037
1038        def _setupXQueries(self):
1039                '''
1040                Set up the required XQueries
1041                - NB, extract the xquery libraries locally for easy reference
1042                '''
1043                self._xq = ndgResources()
1044                for libFile in self._xq.xqlib:
1045                        # NB, we don't want the full path to the files - just the filename
1046                        fileName = libFile.split('/')[-1]
1047                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.