source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 7975

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@7975
Revision 7975, 36.8 KB checked in by sdonegan, 8 years ago (diff)

Updates added now must ALWAYS specify config file..

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24from SchematronValidate import SchematronValidate as Schematron
25
26class AbstractDocumentIngester(object):
27        '''
28        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
29        - including running the various transforms and parsings to get all doc types and spatiotemporal
30        data in the correct form in the DB
31        '''
32        lineSeparator = "------------------------------"
33               
34        # The directory to put things for a tape backup (should already exist)
35        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
36       
37        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
38               
39        def _setupCmdLineOptions(self):
40                '''
41                Determine the logging level to use and configure this appropriately
42                @return args: any input arguments - excluding options
43                '''
44               
45                # check for verbose option
46                try:
47                        opts, args = getopt.getopt(sys.argv[1:], "vd")
48                except getopt.GetoptError, err:
49                    # print help information and exit:
50                    print str(err) # will print something like "option -a not recognized"
51                    sys.exit(2)
52                   
53                if len(args) < 1:
54                        self.usage()
55                   
56                loggingLevel = logging.ERROR
57                for o, a in opts:
58                    if o == "-v":
59                        print " - Verbose mode ON"
60                        loggingLevel = logging.INFO
61                    elif o == "-d":
62                        print " - Debug mode ON"
63                        loggingLevel = logging.DEBUG
64                   
65               
66                # set up any keywords on the object
67                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
68                checkConfInArgs = False         
69                checkProcID = False
70                for arg in args:
71                       
72                       
73                        keywordArgs=['ingestFromDate','individualFile','interval','ingestFromDate',\
74                                                'eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile', \
75                                                'ingestProcessID']
76                                               
77                       
78                        bits = arg.split('=')
79                        if len(bits) == 2:
80                                if bits[0] == keywordArgs[3]:
81                                        self.setIngestFromDate(bits[1])
82                                elif bits[0] == keywordArgs[2]:
83                                        self.setPollInterval(bits[1])
84                                elif bits[0] == keywordArgs[7]:
85                                        print " - Running with specified config file!"
86                                        self.setOaiConfigFile(bits[1])
87                                        checkConfInArgs = True
88                                elif bits[0] == keywordArgs[1]:
89                                        print " - Running in single file ingestion mode!"
90                                        self.setIndFileToIngest(bits[1])
91                                elif bits[0] == keywordArgs[8]:
92                                        print " - WARNING: will log results of this ingestion process into the specified logging database! (NOTE: will also take location of metadata from database as this option intended for DPWS!!)"
93                                        self.setIngestLogID(bits[1])
94                                        checkProcID = True                             
95                                elif bits[0] not in keywordArgs:
96                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
97                                        sys.exit(2)
98                                else:
99                                        setattr(self, bits[0], bits[1])
100                       
101                if checkConfInArgs is False:
102                        print "\nWARNING: Ingest config file has not been specified. Now MANDATORY!"
103                        sys.exit(2)
104                       
105               
106               
107                print self.lineSeparator
108               
109                # NB, this is a slight fudge as cannot get the detailed logging to work
110                # without setting up a new logger here - which means we get two loggers
111                # outputing data. The initial call to logging needs to be tracked down
112                # and configured correctly, so this can be avoided...
113#               self.logger = logging.getLogger()
114#               self.logger.setLevel(loggingLevel)
115
116                # create console handler and set level to debug
117#               ch = logging.StreamHandler()
118#               ch.setLevel(loggingLevel)
119                # create formatter
120#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
121                # add formatter to ch
122#               ch.setFormatter(formatter)
123                # add ch to logger
124#               self.logger.addHandler(ch)
125               
126                return args
127
128
129        def getID(self, filename):
130                '''
131                Gets the identifier out of an input metadata xml record.
132                @param filename - name of document file being processed
133                @return: ID - id to use to refer to the document
134                '''
135                logging.info("Retrieving identifier for metadata record " + filename)
136       
137                xml=file(filename).read()
138               
139                ID = idget(xml)
140               
141                return ID
142       
143       
144        def addFileToPostgresDB(self, filename):
145                '''
146                Add a file to the postgres DB - extracting and storing all the required
147                data in the process
148                @param filename: full path of file to add to postgres DB
149                '''
150                logging.info("Adding file, " + filename + ", to postgres DB")
151               
152                numSlash = len(filename.split('/'))
153                               
154                shortFilename = filename.split('/')[numSlash - 1]
155               
156                self.recordAttemptToIngestDiscoveryID = ""
157                self.ingestWarningMessage = ""
158               
159                self.ingestProblemMessage = ""
160               
161                #need to generate a local version of the iso data model (first call was to run through all available files
162                #and convert & place into relevant dirs as necessary
163                #generate the ISO object from the converted ISO here now..
164               
165                try:
166                                                                                                                                       
167                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
168                       
169                                                       
170                        if self.isoDataModel.createISOdataStructure() is True:
171                                logging.info("ISO extractor worked fine!")
172                       
173                       
174                        elif self.isoDataModel.createISOdataStructure() is True:
175                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
176                                                #sys.exit() # TODO fix these
177                                return
178                        else:
179                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
180                                                #sys.exit()
181                                return
182                except:
183                        logging.error("Could not generate ISO XML!")
184                        return
185               
186                #get the converted ISO into a local var
187                try:
188                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")                   
189                        #self.isoXML = file(filename).read()
190                        #self.originalXMLdoc = self.isoXML
191                       
192                        if not self.changeUrls:
193                                logging.info("No NDG redirection required so using original XML as is..")
194                                self.isoXML = file(filename).read()
195                                self.originalXMLdoc = self.isoXML
196                        else:
197                                #original method...                                                     
198                                self.isoXML = file(self.genIsoFile).read()
199                                self.originalXMLdoc = self.isoXML
200                                       
201                except:
202                        logging.warn("Could not extract converted ISO xml to local variable!")
203                        self.isoXML = ''
204                               
205               
206                # first of all create a PostgresRecord - this object represents all the data required
207                # for a DB entry
208                dao = None
209               
210                try:
211                       
212                        #if record is valid (vital elements checked in ExtractISO - validDoc is True if everything ok..
213                       
214                        #record whats bout to be ingested - if a problem this, error should already have been kicked out
215                        mappingIndex = self.isoDataModel.findTheListData(self.isoDataModel.datasetID)                                   
216                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[mappingIndex][0]                   
217                       
218                        if self.isoDataModel.validDoc is True:                 
219                                                               
220                                #record whats attempting to be ingested
221                                record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
222                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
223                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, self._currentMedinStandard, stubIso = self.isoXML, difXML = self.difXML, isoXML = self.isoXML)                   
224                               
225                                #check whether NERC keyword is in the scope - if not- REJECT!                           
226                                if 'NERCUNDERSCOREDDC' not in record.getScopeInfo():
227                                        logging.error("No NERC keyword present")
228                                        raise ValueError ("NERC keyword is NOT present in record!")
229                               
230                                # Now create the data access object to interface to the DB                     
231                                dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc, dpwsID = self._DPWS_ID )
232                       
233                                # Finally, write the new record
234                                # 0 if failed, 1 if update, 2 if create                         
235                                returnCode = dao.createOrUpdateRecord()
236                                                               
237                                if dao.processingWarning != '':
238                                        self.ingestWarningMessage = dao.processingWarning
239                                                                               
240                                if returnCode == 2:
241                                        self._no_files_ingested += 1
242                               
243                                elif returnCode == 1:
244                                        self._no_files_changed += 1
245                                       
246                        else:
247                                self.ingestProblemMessage= "Content validity problems for record: %s (%s)" %(self.recordAttemptToIngestDiscoveryID,self.isoDataModel.processingMsg)
248                               
249                                logging.warn(self.ingestProblemMessage)
250                               
251                                #now raise error so can record this
252                                raise ValueError,self.ingestProblemMessage
253                       
254                except:
255                       
256                       
257                        #if error encountered, add to failure lisr
258                        logging.error("Could not update: " + filename)                 
259                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
260                        self.updateFailList.append(originalRecordFilename)
261                       
262                        logging.error("Exception thrown - detail: ")
263                        errors = sys.exc_info()
264                        logging.error(errors)
265                        self._error_messages += "%s\n" %str(errors[1])
266                       
267                        if dao:
268                                logging.info("Removing record and its associated info from DB")
269                                logging.info("- to allow clean ingestion on rerun")
270                                try:
271                                        dao.deleteOriginalRecord()
272                                except:
273                                        logging.error("Problem encountered when removing record: ")
274                                        logging.error(sys.exc_info())
275                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
276
277                        self._no_problem_files += 1
278                       
279                        logging.info("Continue processing other files")
280                       
281                return self.recordAttemptToIngestDiscoveryID
282                       
283       
284        def getProcessingConfig(self,configFilePath):
285
286                # Check this file exists; if not, assume an invalid datacentre has been specified               
287                if not os.path.isfile(configFilePath):
288                    sys.exit("ERROR: Could not find the processing config file")
289                   
290                processingConfig = {}
291                   
292                processing_config_file = open(configFilePath, "r")
293               
294                for line in processing_config_file.readlines():
295                        words  = string.split(line)
296                        if len(words) == 0:
297                                continue
298                        elif words[0] == 'code_directory':
299                                processingConfig['code_directory'] = words[1]
300                                                                                               
301                        elif words[0] == 'base_directory':
302                                processingConfig['base_directory'] = words[1]
303                               
304                        elif words[0] == 'reporting_directory':
305                                processingConfig['reporting_directory'] = words[1]
306                               
307                        elif words[0] == 'passwords_file':
308                                processingConfig['passwords_file'] = words[1]
309                               
310                        elif words[0] == 'datcentre_configs':
311                                processingConfig['datcentre_configs'] = words[1]
312                                       
313                        elif words[0] == 'changeURLs':
314                                processingConfig['changeURLs'] = words[1]
315                               
316                        elif words[0] == 'NDG_redirect_URL':
317                                processingConfig['NDG_redirect_URL'] = words[1]
318                               
319                        elif words[0] == 'ingestLoggingConfig':
320                                processingConfig['ingestLoggingConfig'] = words[1]
321                               
322                        elif words[0] == 'ingestConfig':
323                                if not os.path.isfile(words[1]):
324                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
325                                else:
326                                        processingConfig['ingestConfig'] = words[1]
327                               
328                        elif words[0] == 'acceptedFormats':
329                                processingConfig['acceptedFormats'] = words[1]
330                               
331                        elif words[0] == 'saxonJarFile':
332                                if not os.path.isfile(words[1]):
333                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
334                                else:
335                                        processingConfig['saxonJarFile'] = words[1]
336                        elif words[0] == 'xpathIsoClass':
337                               
338                                #build a dictionary with format as the key                             
339                                isoClass = words[1].split(":")                         
340                               
341                                if 'xpathIsoClass' not in processingConfig.keys():
342                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
343                                else:
344                                        temp = processingConfig['xpathIsoClass']                                       
345                                        temp.update({isoClass[0]:isoClass[1]})
346                                        processingConfig['xpathIsoClass'] = temp
347                               
348                        elif words[0] == 'xqueryStubIsoGen':
349                               
350                                #build a dictionary with format as the key
351                                isoClass = words[1].split(":")
352                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
353                               
354                        elif words[0] == 'xqueryConversions':
355                               
356                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
357                                processingConfig['xqueryConversions'] = words[1].split(",")
358                               
359                        elif words[0] == 'currentMEDIN':
360                               
361                                #define the current MEDIN standard so can cast conversions as this (at some stage we will ingest a few different versions)
362                                processingConfig['currentMEDIN'] = words[1]
363                               
364                        elif words[0] == 'schematronLocation':
365                               
366                                #setup parameters for schematron validation
367                                processingConfig['schematronLocation'] = words[1]
368                               
369                        elif words[0] == 'schematronXSLloc':
370                               
371                                #setup parameters for schematron validation
372                                processingConfig['schematronXSLloc'] = words[1]
373                               
374                        elif words[0] == 'schematronValFail':
375                               
376                                #setup parameters for schematron validation
377                                processingConfig['schematronValFail'] = words[1]
378                               
379                        elif words[0] == 'exceptXqConv':
380                               
381                                #build a dictionary with format as the key
382                                xqClass = words[1].split(":")
383                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
384                               
385                        elif words[0] == 'xqDocTypes':
386                               
387                                #build a dictionary with format as the key
388                                xqClass = words[1].split(":")
389                               
390                                if 'xqDocTypes' not in processingConfig.keys():
391                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
392                                else:
393                                        temp = processingConfig['xqDocTypes']                                   
394                                        temp.update({xqClass[0]:xqClass[1]})
395                                        processingConfig['xqDocTypes'] = temp
396                                       
397                processing_config_file.close()
398                       
399                return processingConfig
400       
401       
402        def getConfigDetails(self, datacentre, dpwsID = None):
403                '''
404                Determine whether config details are to be picked up from local config file (for backwards compatibility)
405                or from the DPWS db and set the config params as appropriate
406                @param datacentre: datacentre to use when looking up config file
407                @param dpwsFormat: value extracted from dpws db for data format
408                @param dpwsConfig: resultset containing info from dpws provider_common table           
409                '''             
410                if dpwsID is not None:
411                        self.getConfigDetailsFromDPWS(dpwsID)
412                else:
413                        self.getConfigDetailsFromFile(datacentre)
414                       
415               
416        def getConfigDetailsFromDPWS(self,dpwsID):
417                '''
418                Get the harvested records directory and groups for this datacentre from the
419                datacentre specific DPWS db based on the id.  The harvested records directory depends on the
420                datacentres OAI base url, the set and format. These have to be know up-front.
421                The groups denote which 'portal groups' they belong to - for limiting searches to
422                say NERC-only datacentres records.
423                Groups are added to the intermediate MOLES when it is created.
424                @param datacentre: datacentre to use when looking up config file
425                '''
426               
427                #to start with just use a simple sql cmd until new db is ready and sql function installed               
428                self._DPWS_ID = str(dpwsID)
429               
430                #relic to keep rest of ingester data model happy
431                self._datacentre_groups = ""
432               
433                if self._DPWS_ID == "":
434                        #sys.exit("No DPWS ID entered for this provider!")
435                        raise ValueError("No DPWS ID entered for this provider!")
436                               
437                #now set the relevant variables         
438                self._datacentre_format = ""
439                try:
440                        #need to also get format held in a different table
441                        sqlCmd = "select format from oai_provider where common_id = '%s';" %self._DPWS_ID               
442                        self._datacentre_format = self.pgc_IngestLog.runSQLCommand(sqlCmd)[0][0]
443                        logging.info("format being harvested: " + self._datacentre_format)             
444                       
445                except:
446                        msg = "Could not extract metadata format from DPWS DB!"
447                        logging.error(msg)
448                        raise ValueError(msg)
449                       
450                self._datacentre_namespace = ""
451                self._NDG_dataProvider = False
452                self._PublishToCSW = False
453                self._AggregateDir = ""
454               
455                try:
456                        sqlCmd = "select csw_publish_dir,ndg_data_provider,name,data_marker,csw_publish from provider_common where id='%s';" %self._DPWS_ID
457                        configDetails = self.pgc_IngestLog.runSQLCommand(sqlCmd)
458                       
459                        self._datacentre_namespace = configDetails[0][3]
460                        logging.info("datacentre namespace: " + self._datacentre_namespace)             
461                       
462                        if configDetails[0][1] == 'True':
463                                self._NDG_dataProvider = True
464                                logging.info("Datacentre classified as an NDG data provider")
465                        else:
466                                logging.info("Datacentre is not classified as an NDG data provider")           
467                               
468                        if configDetails[0][4] == 'True':
469                                self._PublishToCSW = True
470                                self._AggregateDir = configDetails[0][0]
471                       
472                except:
473                        msg = "Could not extract configuration details from DPWS DB for provide id %s!" %self._DPWS_ID
474                        logging.error(msg)
475                        raise ValueError(msg)
476                               
477                logging.info(self.lineSeparator)
478                       
479               
480        def getConfigDetailsFromFile (self, datacentre):
481                '''
482                Get the harvested records directory and groups for this datacentre from the
483                datacentre specific config file.  The harvested records directory depends on the
484                datacentres OAI base url, the set and format. These have to be know up-front.
485                The groups denote which 'portal groups' they belong to - for limiting searches to
486                say NERC-only datacentres records.
487                Groups are added to the intermediate MOLES when it is created.
488                @param datacentre: datacentre to use when looking up config file
489                '''
490                # initialise the variables to retrieve from the config file
491                self._harvest_home = ""
492                self._datacentre_groups = ""
493                self._datacentre_format = ""
494                self._datacentre_namespace = ""
495                self._NDG_dataProvider = False
496                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
497               
498               
499                #note changed to production buildout path
500                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
501               
502                #fed up with this rubbish causing problems - use a normal file opener.
503                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
504               
505               
506                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
507                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
508               
509                # Check this file exists; if not, assume an invalid datacentre has been specified
510                if not os.path.isfile(self._datacentre_config_filename):
511                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
512                        "specified (%s) is invalid\n" %datacentre)
513                   
514               
515                datacentre_config_file = open(self._datacentre_config_filename, "r")
516               
517                for line in datacentre_config_file.readlines():
518                        words  = string.split(line)
519                        if len(words) == 0:
520                                continue
521                        elif words[0] == 'host_path':
522                                self._harvest_home = words[1].rstrip()
523                        elif words[0] == 'groups':
524                                self._datacentre_groups = words[1:]
525                        elif words[0] == 'format':
526                                self._datacentre_format = words[1]
527                        elif words[0] == 'namespace':
528                                self._datacentre_namespace = words[1]
529                        elif words[0] == 'NDG_dataProvider':
530                                self._NDG_dataProvider = True
531                        elif words[0] == 'feed':
532                                self._IngestViaFeed = True
533                        elif words[0] == 'dpws_id':
534                                self._DPWS_ID = words[1]
535                        elif words[0] == 'csw_publish':
536                                self._PublishToCSW = words[1]
537                        elif words[0] == 'csw_publish_dir':
538                                self._AggregateDir = words[1]
539                       
540                datacentre_config_file.close()
541               
542                #stop ingest on this datacentre if process thread is OAI and the config says FEED
543                if ((self.processThread == 'OAI') and self._IngestViaFeed):
544                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
545                                               
546                if self._harvest_home == "":
547                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
548               
549                logging.info("harvested records are in " + self._harvest_home)
550               
551                if self._datacentre_groups == "":
552                    logging.info("No groups/keywords set for datacentre " + datacentre)
553                else:
554                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
555               
556                if self._datacentre_format == "":
557                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
558               
559                logging.info("format being harvested: " + self._datacentre_format)
560               
561                if self._datacentre_namespace == "":
562                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
563               
564                logging.info("datacentre namespace: " + self._datacentre_namespace)
565               
566                if self._DPWS_ID == "":
567                        sys.exit("No DPWS ID entered for this provider!")
568               
569                if self._NDG_dataProvider:
570                        logging.info("Datacentre classified as an NDG data provider")
571                else:
572                        logging.info("Datacentre is not classified as an NDG data provider")
573                logging.info(self.lineSeparator)
574               
575               
576        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
577                '''
578                Processes/renames the files (changed 08/01/07 to get id from inside file)
579                 - also replace any namespace declarations with a standard one which we know works in NDG
580                 NB, this copies files from the original dir to the discovery dir
581                 @param originals_dir: directory to convert files from
582                 @param discovery_dir: directory in which converted files will end up
583                 @return numfilesproc: counter of number of files processed
584                '''
585               
586                numfilesproc = 0
587                badXMLfiles = 0         
588               
589                self.inputFileOrigFinal = {}
590                self.badXMLfileList = {}               
591                self.difXML = None
592               
593                #create dictionary for a mapping between original files and converted discovery files
594                self.originalsDiscoveryFilesMap = {}
595               
596                logging.info(self.lineSeparator)
597                logging.info("Renaming files plop:")
598               
599               
600                if self.ingestProcessID is not None and self.procID is None:
601                       
602                        self._getPostgresDBConnectionLogging()
603                                               
604                        #if process id has been supplied, then update the logging db with a "start_ingest"
605                        sqlStatusCmd = "select update_ingest_status (%s, 'run_ingest');" %self.ingestProcessID
606                        self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
607                       
608                        sqlStatusCmd = "select update_ingest_info (%s, ' (preprocessing...)');" %(self.ingestProcessID)
609                        self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
610               
611                for filename in os.listdir(originals_dir):
612                       
613                        if not filename.endswith('.xml'):
614                                logging.warning('File %s is not xml format. Not processed'  %(filename))
615                                continue
616                       
617                        original_filename = originals_dir + filename
618                       
619                        #get the name of the file to be used in the xquery
620                        metadataFilename = filename.replace('.xml', '')
621                               
622                       
623                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
624                       
625                        #NOTE as of 120911 took out provision for DIF ingestion..
626                        if format == 'DIF_9.4':
627                                logging.error("DIF ingestion no longer supported into NERC DWS!!!")
628                                sys.exit(2)
629                       
630                        else:                           
631                                isoFormat = format
632                               
633                                #perform schematron validation of input file...
634                                #xmlValidation = self.SchematronValidate.validate(originals_dir + filename,True)
635                               
636                               
637                                self.originalXMLdoc = "null"
638                               
639                               
640                                #generate the ISO object from the ISO xml..                     
641                                                                                       
642                                isoDataModel = ExtractISO(original_filename,self._isoClass[self._datacentre_format])
643                               
644                                if isoDataModel.createISOdataStructure() is True:
645                                                logging.info("ISO extractor worked fine!")
646                                               
647                                                #Back to main stream of ISO ingest
648                                                if self._NDG_dataProvider:
649                                                               
650                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
651                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
652                                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
653                               
654                                                else:
655                                                        mappingIndex = isoDataModel.findTheListData(isoDataModel.datasetID)     
656                                                        ident = isoDataModel.datasetID[mappingIndex][0]
657                                                       
658                                                        ident = ident.replace(":", "-")
659                                                        ident = ident.replace("/", "-")
660                               
661                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
662                                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
663                               
664                                                #now create this stub ISO file on system so can access it
665                                                self.genIsoFile = discovery_dir + new_filename_short   
666                                                                               
667                                               
668                                                #get the converted ISO into a local var
669                                                try:
670                                                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
671                                                       
672                                                        #01/02/11: Put in a workaround for MEDIN - need original XML with ALL attributes.  Not doing a NDG redirect, so take original file if this is the case
673                                                        if not self.changeUrls:
674                                                                logging.info("No NDG redirection required so using original XML as is..")
675                                                                self.isoXML = file(original_filename).read()
676                                                                self.originalXMLdoc = self.isoXML
677                                                        else:
678                                                                #original method...     
679                                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
680                                                                               
681                                                                self.isoXML = file(self.genIsoFile).read()
682                                                                self.originalXMLdoc = self.isoXML
683                                                       
684                                                        isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
685                                                       
686                                                       
687                                                except:
688                                                        logging.warn("Could not extract converted ISO xml to local variable!")
689                                                        self.isoXML = ''
690                                                       
691                                                logging.info("original file = " + original_filename)
692                                                logging.info("newfile = " + new_filename)
693                               
694                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
695                                                #this links a derived filename in processing dir with original filename
696                                                #get basic filename from the path+filename passed to this function to use as key               
697                                                self.inputFileOrigFinal[new_filename_short]=filename
698                                               
699                                                numfilesproc += 1
700                                                                                               
701                                elif isoDataModel.createISOdataStructure() is True:
702                                        logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
703                                                #sys.exit() # TODO fix these
704                                       
705                                else:                                   
706                                        logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")
707                                        badXMLfiles += 1
708                                        self.badXMLfileList[new_filename_short]=filename
709                               
710                       
711                                self.originalsDiscoveryFilesMap[new_filename_short] = filename
712                       
713                logging.info("File renaming and converting completed")
714                logging.info(self.lineSeparator)
715               
716                       
717                return numfilesproc,badXMLfiles
718       
719       
720        def _getPostgresDBConnection(self):
721                '''
722                Get the default postgres DB connection - by reading in data from the db config file
723                '''
724               
725                logging.debug("Setting up connection to postgres DB")
726                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
727               
728               
729                self.pgc = pgc(configFile = self._databaseConfigurationFile)
730                logging.info("Postgres DB connection now set up")
731               
732               
733       
734        def _getPostgresDBConnectionLogging(self):
735                '''
736                Get the default postgres DB connection - by reading in data from the db config file
737                '''
738               
739                logging.debug("Setting up connection to postgres DB")
740                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
741               
742                self.pgc_IngestLog = pgc(configFile = self.ingestLoggingDatabaseConfigurationFile)
743                logging.info("Postgres DB connection now set up")
744
745
746
747        def     _backupAndCleanData(self):
748                '''
749                Back up ingested data for specified data centre, then clean any of
750                the ingest dirs
751                '''
752                logging.info("Backing up ingested data")
753                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
754                  strftime("%y%m%d_%H%M") + "_originals/"
755               
756                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
757                logging.info("Data backed up - now clearing ingest directories")
758                #Clear out the original harvest records area and discovery dir
759                #FileUtilities.cleanDir(self.originals_dir)
760                #FileUtilities.cleanDir(self.discovery_dir)
761                logging.info("Ingest directories cleared")
762
763
764        def     _setupDataCentreDirs(self):
765                '''
766                Set up directories appropriate for the current data centre
767                '''
768                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
769               
770               
771                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
772               
773                # the following dirs define where the specific documents should go
774                self.originals_dir = data_dir + "/oai/originals/" # copy of original
775                self.discovery_dir = data_dir + "/discovery/"
776                self.ingestReport_dir = data_dir + "/ingestReports/"
777               
778                # Create/clear the 'in' and 'out' directories
779                FileUtilities.setUpDir(self.originals_dir)
780                FileUtilities.setUpDir(self.discovery_dir)
781                FileUtilities.setUpDir(self.ingestReport_dir)
782               
783                logging.info("Ingest directories for data centre set up")
784
785
786        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
787                '''
788                Convert files from originals dir to discovery one, then
789                ingest, backup and clean
790                @param originals_dir: directory to ingest docs from
791                @param discovery_dir: directory to use to process ingested docs
792                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
793                '''
794               
795                self.thisIngestStartDate = str(datetime.datetime.now())
796               
797                numfilesproc,badXmlFiles = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
798               
799                filenames = os.listdir(self.discovery_dir)
800               
801                ingestProbMsg = ""
802                ingestWarnMsg = ""
803                               
804                #generate a list of files ALREADY in database so can compare with what has been ingested
805                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
806                               
807                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
808                filesPresentList=[]
809       
810               
811                if filePresentListArr:
812                        for fileArr in filePresentListArr:
813                                filesPresentList.append(fileArr[0])
814                               
815                                #TODO - is above relevant - duplicating functionlaity?
816       
817                #create list to to hold files ingest failed on.
818                self.updateFailList = []
819                self.deletedFailList = []
820                self.problemMessageList = {}   
821                self.warnMessageList = {}
822               
823                counter = 1
824                warningCounter = 0
825                               
826                for filename in filenames:
827                       
828                        fullPath = self.discovery_dir + filename
829                       
830                        #if process id has been supplied, then update the logging db with a "start_ingest"
831                        if self.ingestProcessID is not None and self.procID is None:
832                                sqlStatusCmd = "select update_ingest_info (%s, ' (ingesting %s of %s)');" %(self.ingestProcessID,counter,numfilesproc)                 
833                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
834                       
835                       
836                        if os.path.isfile(fullPath):
837                                       
838                                thisIngestedID = self.addFileToPostgresDB(fullPath)
839                               
840                                #record all problem messages
841                                #ingestProbMsg = ingestProbMsg + self.ingestProblemMessage + "\n"
842                                if self.ingestProblemMessage != '':
843                                        ingestProbMsg = ingestProbMsg + self.ingestProblemMessage                                       
844                                        self.problemMessageList[self.originalsDiscoveryFilesMap[filename]] = self.ingestProblemMessage
845                                       
846                                else:
847                                       
848                                        #at this point, check to see if record can be aggregated for publishing to CSW?
849                                        if self._PublishToCSW == "True":
850                                               
851                                                #copy file to specified aggregation directory
852                                                FileUtilities.copyFileToDir(fullPath, self._AggregateDir)
853                                       
854                                if self.ingestWarningMessage != '':
855                                        warningCounter += 1
856                                        ingestWarnMsg = ingestWarnMsg + self.ingestWarningMessage
857                                        self.warnMessageList[filename] = self.ingestWarningMessage
858                                       
859                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
860                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
861                                        if thisIngestedID in filesPresentList: 
862                                                filesPresentList.remove(thisIngestedID)
863                       
864                       
865                        counter += 1   
866               
867                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
868                #will need to be removed.
869               
870                #only do this if not in single file mode (or else wverything else gets deleted!)               
871                if ((self.indFileToIngest == "") & (feed)):
872                        for item in filesPresentList:
873                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
874                                DeleteRecord(item)
875                                self.deletedFailList.append(item)
876                                self._no_files_deleted += 1
877                       
878                       
879                self._backupAndCleanData()
880               
881                #at this stage put the reporting code in (will work for oai or atom feed)
882                #create a summary file for each data centre ingest
883                data_dir = self._base_dir + "data/"
884                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
885                recOpFile = open(recOpFileName,'w')
886               
887                logging.info("oai_document_ingest processing complete:")
888               
889                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
890                self.thisIngestEndDate = str(datetime.datetime.now())
891               
892                messageSeparator = "\n\n********************************************************************************"
893               
894                message = "Ingest report for data centre: " + datacentre + "\n"
895                message = message + "Ingest date: " + self.thisIngestEndDate + "\n"
896                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
897                message = message + "TOTAL_NUMBER_OF_FILES " + str(numfilesproc+badXmlFiles) + "\n"
898                message = message + "TOTAL_PROCESSED_SUCCESFULLY " + str(numfilesproc) + "\n"
899                message = message + "TOTAL_PROCESSED_UNSUCCESFULLY " + str(badXmlFiles) + "\n" 
900                message = message + "INGESTED_Created " + str(self._no_files_ingested)  + "\n"
901                message = message + "INGESTED_Updated " + str(self._no_files_changed)  + "\n"
902                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
903               
904                if warningCounter > 0:
905                        message = message + messageSeparator
906                        message = message + "\nINGEST WARNINGS:  " + str(warningCounter) + "\n"
907                       
908                        message = message +  "\nBelow are the files that had ingest warnings...\n\n"
909                       
910                        for warnFile in self.warnMessageList.keys():
911                                message = message + "\nWARNING_FILE " + self.originalsDiscoveryFilesMap[warnFile] + "    " + self.warnMessageList[warnFile] + "\n"
912                       
913               
914                if self._no_problem_files > 0: 
915                        message = message + messageSeparator
916                        message = message + "\nPROBLEM_FILES: " + str(self._no_problem_files)  + "\n"
917                       
918               
919                if len(self.updateFailList) > 0:
920                        message = message +  "\nBelow are comments recorded on problem files...\n\n"
921                                       
922                        for badFile in self.updateFailList:
923                                #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
924                               
925                                #NOTE: there may not be a problem message for every file that failed to ingest..
926                                if badFile in self.problemMessageList.keys():
927                                        message = message +"\nPROBLEM_FILE " + badFile + "\n"
928                                        message = message + self.problemMessageList[badFile] + "\n"
929                               
930               
931                if len(self.badXMLfileList.keys()) > 0:
932                        message = message + messageSeparator
933                        message = message +  "\nBelow are the files that could not be parsed (i.e. bad XML?)...\n\n"
934                        for badXmlFile in self.badXMLfileList.values():
935                                message = message + "BAD_XML_FILE: " + badXmlFile +"\n"
936               
937                                               
938                #log processing results to ingestLogging table if required to           
939               
940                if self.ingestProcessID is not None:
941                        logging.info("Updating ingest logging database")
942                                               
943                        sqlLoggingCmd = "select add_ingest_stats(%s,'%s','%s','%s','%s','%s','%s','%s','%s','%s');"%(self.ingestProcessID,self.thisIngestStartDate,self.thisIngestEndDate,numfilesproc+badXmlFiles,numfilesproc,badXmlFiles,self._no_files_ingested,self._no_files_changed,self._no_files_deleted,self._no_problem_files)
944                                       
945                        #update ingestLogging                   
946                        self.pgc_IngestLog.runSQLCommand(sqlLoggingCmd)
947                       
948                        #now update problem file log if there are problem files
949                        if self._no_problem_files > 0:
950                                logging.info("There are %s problem files - recording these in stats logging db" %self._no_problem_files)
951                               
952                                #note sometimes message will not always be recorded, so just record name of file
953                                if len(self.problemMessageList.keys()) < 1:
954                                       
955                                        for badFile in self.updateFailList:
956                                                sqlProbFileCmd = "select add_problem_file(%s,'%s','%s');"%(self.ingestProcessID,badFile,'Could not catch precise error for this file!!')
957                               
958                                                #update ingestLogging           
959                                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)
960                               
961                                else:
962                                        for badFile in self.problemMessageList.keys():                                 
963                                                sqlProbFileCmd = "select add_problem_file(%s,'%s','%s');"%(self.ingestProcessID,badFile,self.problemMessageList[badFile])
964                                                       
965                                                #update ingestLogging                   
966                                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)               
967                               
968                                       
969                        #if process id has been supplied, then update the logging db with a "start_ingest" (only for DPWS ops)
970                        if self.procID is None:
971                                sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
972                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
973                       
974                #update with content from the problem messages recorded
975                #message = message + ingestProbMsg + "\n"               
976                         
977                recOpFile.write(message)
978                               
979                return numfilesproc,badXmlFiles, message
980
981
982
983        def _setupXQueries(self):
984                '''
985                Set up the required XQueries
986                - NB, extract the xquery libraries locally for easy reference
987                '''
988                self._xq = ndgResources()
989                for libFile in self._xq.xqlib:
990                        # NB, we don't want the full path to the files - just the filename
991                        fileName = libFile.split('/')[-1]
992                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.