source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 8000

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@8000
Revision 8000, 37.9 KB checked in by sdonegan, 8 years ago (diff)

Updated bad file handling and reporting

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24from SchematronValidate import SchematronValidate as Schematron
25
26class SqlException(Exception):pass
27
28class AbstractDocumentIngester(object):
29        '''
30        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
31        - including running the various transforms and parsings to get all doc types and spatiotemporal
32        data in the correct form in the DB
33        '''
34        lineSeparator = "------------------------------"
35               
36        # The directory to put things for a tape backup (should already exist)
37        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
38       
39        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
40               
41        def _setupCmdLineOptions(self):
42                '''
43                Determine the logging level to use and configure this appropriately
44                @return args: any input arguments - excluding options
45                '''
46               
47                # check for verbose option
48                try:
49                        opts, args = getopt.getopt(sys.argv[1:], "vd")
50                except getopt.GetoptError, err:
51                    # print help information and exit:
52                    print str(err) # will print something like "option -a not recognized"
53                    sys.exit(2)
54                   
55                if len(args) < 1:
56                        self.usage()
57                   
58                loggingLevel = logging.ERROR
59                for o, a in opts:
60                    if o == "-v":
61                        print " - Verbose mode ON"
62                        loggingLevel = logging.INFO
63                    elif o == "-d":
64                        print " - Debug mode ON"
65                        loggingLevel = logging.DEBUG
66                   
67               
68                # set up any keywords on the object
69                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
70                checkConfInArgs = False         
71                checkProcID = False
72               
73                for arg in args:
74                       
75                       
76                        keywordArgs=['ingestFromDate','individualFile','interval','ingestFromDate',\
77                                                'eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile', \
78                                                'ingestProcessID','useLogFile']
79                                               
80                       
81                        bits = arg.split('=')
82                        if len(bits) == 2:
83                                if bits[0] == keywordArgs[3]:
84                                        self.setIngestFromDate(bits[1])
85                                elif bits[0] == keywordArgs[2]:
86                                        self.setPollInterval(bits[1])
87                                elif bits[0] == keywordArgs[7]:
88                                        print " - Running with specified config file!"
89                                        self.setOaiConfigFile(bits[1])
90                                        checkConfInArgs = True
91                                elif bits[0] == keywordArgs[1]:
92                                        print " - Running in single file ingestion mode!"
93                                        self.setIndFileToIngest(bits[1])
94                                elif bits[0] == keywordArgs[8]:
95                                        print " - WARNING: will log results of this ingestion process into the specified logging database! (NOTE: will also take location of metadata from database as this option intended for DPWS!!)"
96                                        self.setIngestLogID(bits[1])
97                                        checkProcID = True
98                                       
99                                elif bits[0] not in keywordArgs:
100                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
101                                        sys.exit(2)
102                                else:
103                                        setattr(self, bits[0], bits[1])
104                       
105                if checkConfInArgs is False:
106                        print "\nWARNING: Ingest config file has not been specified. Now MANDATORY!"
107                        sys.exit(2)
108                       
109                print self.lineSeparator
110               
111                # NB, this is a slight fudge as cannot get the detailed logging to work
112                # without setting up a new logger here - which means we get two loggers
113                # outputing data. The initial call to logging needs to be tracked down
114                # and configured correctly, so this can be avoided...
115#               self.logger = logging.getLogger()
116#               self.logger.setLevel(loggingLevel)
117
118                # create console handler and set level to debug
119#               ch = logging.StreamHandler()
120#               ch.setLevel(loggingLevel)
121                # create formatter
122#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
123                # add formatter to ch
124#               ch.setFormatter(formatter)
125                # add ch to logger
126#               self.logger.addHandler(ch)
127               
128                return args
129
130
131        def getID(self, filename):
132                '''
133                Gets the identifier out of an input metadata xml record.
134                @param filename - name of document file being processed
135                @return: ID - id to use to refer to the document
136                '''
137                logging.info("Retrieving identifier for metadata record " + filename)
138       
139                xml=file(filename).read()
140               
141                ID = idget(xml)
142               
143                return ID
144       
145       
146        def addFileToPostgresDB(self, filename):
147                '''
148                Add a file to the postgres DB - extracting and storing all the required
149                data in the process
150                @param filename: full path of file to add to postgres DB
151                '''
152                logging.info("Adding file, " + filename + ", to postgres DB")
153               
154                numSlash = len(filename.split('/'))
155                               
156                shortFilename = filename.split('/')[numSlash - 1]
157               
158                self.recordAttemptToIngestDiscoveryID = ""
159                self.ingestWarningMessage = ""
160               
161                self.ingestProblemMessage = ""
162               
163                #need to generate a local version of the iso data model (first call was to run through all available files
164                #and convert & place into relevant dirs as necessary
165                #generate the ISO object from the converted ISO here now..
166               
167                try:
168                                                                                                                                       
169                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
170                       
171                                                       
172                        if self.isoDataModel.createISOdataStructure() is True:
173                                logging.info("ISO extractor worked fine!")
174                       
175                       
176                        elif self.isoDataModel.createISOdataStructure() is True:
177                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
178                                                #sys.exit() # TODO fix these
179                                return
180                        else:
181                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
182                                                #sys.exit()
183                                return
184                except:
185                        logging.error("Could not generate ISO XML!")
186                        return
187               
188                #get the converted ISO into a local var
189                try:
190                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")                   
191                        #self.isoXML = file(filename).read()
192                        #self.originalXMLdoc = self.isoXML
193                       
194                        if not self.changeUrls:
195                                logging.info("No NDG redirection required so using original XML as is..")
196                                self.isoXML = file(filename).read()
197                                self.originalXMLdoc = self.isoXML
198                        else:
199                                #original method...                                                     
200                                self.isoXML = file(self.genIsoFile).read()
201                                self.originalXMLdoc = self.isoXML
202                                       
203                except:
204                        logging.warn("Could not extract converted ISO xml to local variable!")
205                        self.isoXML = ''
206                               
207               
208                # first of all create a PostgresRecord - this object represents all the data required
209                # for a DB entry
210                dao = None
211               
212                try:
213                        #import pdb
214                        #pdb.set_trace()
215                        #if record is valid (vital elements checked in ExtractISO - validDoc is True if everything ok..
216                       
217                        #record whats bout to be ingested - if a problem this, error should already have been kicked out
218                        mappingIndex = self.isoDataModel.findTheListData(self.isoDataModel.datasetID)                                   
219                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[mappingIndex][0]                   
220                       
221                        if self.isoDataModel.validDoc is True:                 
222                               
223                                #record whats attempting to be ingested
224                                record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
225                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
226                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, self._currentMedinStandard, stubIso = self.isoXML, difXML = self.difXML, isoXML = self.isoXML)                   
227                               
228                                #check whether NERC keyword is in the scope - if not- REJECT!                           
229                                if 'NERCUNDERSCOREDDC' not in record.getScopeInfo():
230                                        logging.error("No NERC keyword present")
231                                        raise ValueError ("NERC keyword is NOT present in record!")
232                               
233                                # Now create the data access object to interface to the DB                     
234                                dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc, dpwsID = self._DPWS_ID )
235                       
236                                # Finally, write the new record
237                                # 0 if failed, 1 if update, 2 if create                         
238                                returnCode, msg = dao.createOrUpdateRecord()
239                               
240                                if returnCode == 0:
241                                        logging.error("Detected problem on update: msg")
242                                       
243                                        #most likely problem is a character encoding problem.. to get around this (& t be sure) sort the encoding of filename to be on the safe side:
244                                        datasetSafeName,cnvMsg = record.characterEncoding(self.isoDataModel.datasetName[0][0])
245                                       
246                                        self.ingestProblemMessage= "%s (Content problems: %s)" %(datasetSafeName,msg)
247                                        #import pdb; pdb.set_trace()
248                                        raise ValueError,self.ingestProblemMessage
249                                        #raise SqlException()
250                                                               
251                                if dao.processingWarning != '':
252                                        self.ingestWarningMessage = dao.processingWarning
253                                                                               
254                                if returnCode == 2:
255                                        self._no_files_ingested += 1
256                               
257                                elif returnCode == 1:
258                                        self._no_files_changed += 1
259                                       
260                        else:
261                                self.ingestProblemMessage= "Content validity problems for record: %s (%s)" %(self.recordAttemptToIngestDiscoveryID,self.isoDataModel.processingMsg)
262                               
263                                logging.warn(self.ingestProblemMessage)
264                               
265                                #now raise error so can record this
266                                raise ValueError,self.ingestProblemMessage
267                       
268                except Exception:
269                       
270                        #if error encountered, add to failure lisr
271                        logging.error("Could not update: " + filename)                 
272                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
273                        self.updateFailList.append(originalRecordFilename)
274                       
275                       
276                        logging.error("Exception thrown - detail: ")
277                        errors = sys.exc_info()
278                        logging.error(errors)
279                        self._error_messages += "%s\n" %str(errors[1])
280                       
281                        if dao:
282                                logging.info("Removing record and its associated info from DB")
283                                logging.info("- to allow clean ingestion on rerun")
284                                try:
285                                        dao.deleteOriginalRecord()
286                                except:
287                                        logging.error("Problem encountered when removing record: ")
288                                        logging.error(sys.exc_info())
289                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
290                        #import pdb;
291                        #pdb.set_trace()
292                        self._no_problem_files += 1
293                       
294                        logging.info("Continue processing other files")
295                       
296                return self.recordAttemptToIngestDiscoveryID
297                       
298       
299        def getProcessingConfig(self,configFilePath):
300
301                # Check this file exists; if not, assume an invalid datacentre has been specified               
302                if not os.path.isfile(configFilePath):
303                    sys.exit("ERROR: Could not find the processing config file")
304                   
305                processingConfig = {}
306                   
307                processing_config_file = open(configFilePath, "r")
308               
309                for line in processing_config_file.readlines():
310                        words  = string.split(line)
311                        if len(words) == 0:
312                                continue
313                        elif words[0] == 'code_directory':
314                                processingConfig['code_directory'] = words[1]
315                                                                                               
316                        elif words[0] == 'base_directory':
317                                processingConfig['base_directory'] = words[1]
318                               
319                        elif words[0] == 'reporting_directory':
320                                processingConfig['reporting_directory'] = words[1]
321                               
322                        elif words[0] == 'passwords_file':
323                                processingConfig['passwords_file'] = words[1]
324                               
325                        elif words[0] == 'datcentre_configs':
326                                processingConfig['datcentre_configs'] = words[1]
327                                       
328                        elif words[0] == 'changeURLs':
329                                processingConfig['changeURLs'] = words[1]
330                               
331                        elif words[0] == 'NDG_redirect_URL':
332                                processingConfig['NDG_redirect_URL'] = words[1]
333                               
334                        elif words[0] == 'ingestLoggingConfig':
335                                processingConfig['ingestLoggingConfig'] = words[1]
336                               
337                        elif words[0] == 'ingestConfig':
338                                if not os.path.isfile(words[1]):
339                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
340                                else:
341                                        processingConfig['ingestConfig'] = words[1]
342                               
343                        elif words[0] == 'acceptedFormats':
344                                processingConfig['acceptedFormats'] = words[1]
345                               
346                        elif words[0] == 'saxonJarFile':
347                                if not os.path.isfile(words[1]):
348                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
349                                else:
350                                        processingConfig['saxonJarFile'] = words[1]
351                        elif words[0] == 'xpathIsoClass':
352                               
353                                #build a dictionary with format as the key                             
354                                isoClass = words[1].split(":")                         
355                               
356                                if 'xpathIsoClass' not in processingConfig.keys():
357                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
358                                else:
359                                        temp = processingConfig['xpathIsoClass']                                       
360                                        temp.update({isoClass[0]:isoClass[1]})
361                                        processingConfig['xpathIsoClass'] = temp
362                               
363                        elif words[0] == 'xqueryStubIsoGen':
364                               
365                                #build a dictionary with format as the key
366                                isoClass = words[1].split(":")
367                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
368                               
369                        elif words[0] == 'xqueryConversions':
370                               
371                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
372                                processingConfig['xqueryConversions'] = words[1].split(",")
373                               
374                        elif words[0] == 'currentMEDIN':
375                               
376                                #define the current MEDIN standard so can cast conversions as this (at some stage we will ingest a few different versions)
377                                processingConfig['currentMEDIN'] = words[1]
378                               
379                        elif words[0] == 'schematronLocation':
380                               
381                                #setup parameters for schematron validation
382                                processingConfig['schematronLocation'] = words[1]
383                               
384                        elif words[0] == 'schematronXSLloc':
385                               
386                                #setup parameters for schematron validation
387                                processingConfig['schematronXSLloc'] = words[1]
388                               
389                        elif words[0] == 'schematronValFail':
390                               
391                                #setup parameters for schematron validation
392                                processingConfig['schematronValFail'] = words[1]
393                               
394                        elif words[0] == 'exceptXqConv':
395                               
396                                #build a dictionary with format as the key
397                                xqClass = words[1].split(":")
398                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
399                               
400                        elif words[0] == 'xqDocTypes':
401                               
402                                #build a dictionary with format as the key
403                                xqClass = words[1].split(":")
404                               
405                                if 'xqDocTypes' not in processingConfig.keys():
406                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
407                                else:
408                                        temp = processingConfig['xqDocTypes']                                   
409                                        temp.update({xqClass[0]:xqClass[1]})
410                                        processingConfig['xqDocTypes'] = temp
411                                       
412                processing_config_file.close()
413                       
414                return processingConfig
415       
416       
417        def getConfigDetails(self, datacentre, dpwsID = None):
418                '''
419                Determine whether config details are to be picked up from local config file (for backwards compatibility)
420                or from the DPWS db and set the config params as appropriate
421                @param datacentre: datacentre to use when looking up config file
422                @param dpwsFormat: value extracted from dpws db for data format
423                @param dpwsConfig: resultset containing info from dpws provider_common table           
424                '''             
425                if dpwsID is not None:
426                        self.getConfigDetailsFromDPWS(dpwsID)
427                else:
428                        self.getConfigDetailsFromFile(datacentre)
429                       
430               
431        def getConfigDetailsFromDPWS(self,dpwsID):
432                '''
433                Get the harvested records directory and groups for this datacentre from the
434                datacentre specific DPWS db based on the id.  The harvested records directory depends on the
435                datacentres OAI base url, the set and format. These have to be know up-front.
436                The groups denote which 'portal groups' they belong to - for limiting searches to
437                say NERC-only datacentres records.
438                Groups are added to the intermediate MOLES when it is created.
439                @param datacentre: datacentre to use when looking up config file
440                '''
441               
442                #to start with just use a simple sql cmd until new db is ready and sql function installed               
443                self._DPWS_ID = str(dpwsID)
444               
445                #relic to keep rest of ingester data model happy
446                self._datacentre_groups = ""
447               
448                if self._DPWS_ID == "":
449                        #sys.exit("No DPWS ID entered for this provider!")
450                        raise ValueError("No DPWS ID entered for this provider!")
451                               
452                #now set the relevant variables         
453               
454                #16/11/11 - hardwire this to the NERC format for now - treated differently between CSW and OAI providers in the DPWS db - clarify this with Mau
455                #for when adding MEDIN to DPWS providers?
456                self._datacentre_format = "NERC_DMS_0.7"
457                #try:
458                        #need to also get format held in a different table
459                        #sqlCmd = "select format from oai_provider where common_id = '%s';" %self._DPWS_ID             
460                        #self._datacentre_format = self.pgc_IngestLog.runSQLCommand(sqlCmd)[0][0]
461                        #logging.info("format being harvested: " + self._datacentre_format)             
462                       
463                #except:
464                        #msg = "Could not extract metadata format from DPWS DB!"
465                        #logging.error(msg)
466                        #raise ValueError(msg)
467                       
468                self._datacentre_namespace = ""
469                self._NDG_dataProvider = False
470                self._PublishToCSW = False
471                self._AggregateDir = ""
472               
473                try:
474                        sqlCmd = "select csw_publish_dir,ndg_data_provider,name,data_marker,csw_publish from provider_common where id='%s';" %self._DPWS_ID
475                        configDetails = self.pgc_IngestLog.runSQLCommand(sqlCmd)
476                       
477                        self._datacentre_namespace = configDetails[0][3]
478                        logging.info("datacentre namespace: " + self._datacentre_namespace)             
479                       
480                        if configDetails[0][1] == 'True':
481                                self._NDG_dataProvider = True
482                                logging.info("Datacentre classified as an NDG data provider")
483                        else:
484                                logging.info("Datacentre is not classified as an NDG data provider")           
485                               
486                        if configDetails[0][4] == 'True':
487                                self._PublishToCSW = True
488                                self._AggregateDir = configDetails[0][0]
489                       
490                except:
491                        msg = "Could not extract configuration details from DPWS DB for provide id %s!" %self._DPWS_ID
492                        logging.error(msg)
493                        raise ValueError(msg)
494                               
495                logging.info(self.lineSeparator)
496                       
497               
498        def getConfigDetailsFromFile (self, datacentre):
499                '''
500                Get the harvested records directory and groups for this datacentre from the
501                datacentre specific config file.  The harvested records directory depends on the
502                datacentres OAI base url, the set and format. These have to be know up-front.
503                The groups denote which 'portal groups' they belong to - for limiting searches to
504                say NERC-only datacentres records.
505                Groups are added to the intermediate MOLES when it is created.
506                @param datacentre: datacentre to use when looking up config file
507                '''
508                # initialise the variables to retrieve from the config file
509                self._harvest_home = ""
510                self._datacentre_groups = ""
511                self._datacentre_format = ""
512                self._datacentre_namespace = ""
513                self._NDG_dataProvider = False
514                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
515               
516               
517                #note changed to production buildout path
518                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
519               
520                #fed up with this rubbish causing problems - use a normal file opener.
521                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
522               
523               
524                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
525                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
526               
527                # Check this file exists; if not, assume an invalid datacentre has been specified
528                if not os.path.isfile(self._datacentre_config_filename):
529                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
530                        "specified (%s) is invalid\n" %datacentre)
531                   
532               
533                datacentre_config_file = open(self._datacentre_config_filename, "r")
534               
535                for line in datacentre_config_file.readlines():
536                        words  = string.split(line)
537                        if len(words) == 0:
538                                continue
539                        elif words[0] == 'host_path':
540                                self._harvest_home = words[1].rstrip()
541                        elif words[0] == 'groups':
542                                self._datacentre_groups = words[1:]
543                        elif words[0] == 'format':
544                                self._datacentre_format = words[1]
545                        elif words[0] == 'namespace':
546                                self._datacentre_namespace = words[1]
547                        elif words[0] == 'NDG_dataProvider':
548                                self._NDG_dataProvider = True
549                        elif words[0] == 'feed':
550                                self._IngestViaFeed = True
551                        elif words[0] == 'dpws_id':
552                                self._DPWS_ID = words[1]
553                        elif words[0] == 'csw_publish':
554                                self._PublishToCSW = words[1]
555                        elif words[0] == 'csw_publish_dir':
556                                self._AggregateDir = words[1]
557                       
558                datacentre_config_file.close()
559               
560                #stop ingest on this datacentre if process thread is OAI and the config says FEED
561                if ((self.processThread == 'OAI') and self._IngestViaFeed):
562                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
563                                               
564                if self._harvest_home == "":
565                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
566               
567                logging.info("harvested records are in " + self._harvest_home)
568               
569                if self._datacentre_groups == "":
570                    logging.info("No groups/keywords set for datacentre " + datacentre)
571                else:
572                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
573               
574                if self._datacentre_format == "":
575                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
576               
577                logging.info("format being harvested: " + self._datacentre_format)
578               
579                if self._datacentre_namespace == "":
580                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
581               
582                logging.info("datacentre namespace: " + self._datacentre_namespace)
583               
584                if self._DPWS_ID == "":
585                        sys.exit("No DPWS ID entered for this provider!")
586               
587                if self._NDG_dataProvider:
588                        logging.info("Datacentre classified as an NDG data provider")
589                else:
590                        logging.info("Datacentre is not classified as an NDG data provider")
591                logging.info(self.lineSeparator)
592               
593               
594        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
595                '''
596                Processes/renames the files (changed 08/01/07 to get id from inside file)
597                 - also replace any namespace declarations with a standard one which we know works in NDG
598                 NB, this copies files from the original dir to the discovery dir
599                 @param originals_dir: directory to convert files from
600                 @param discovery_dir: directory in which converted files will end up
601                 @return numfilesproc: counter of number of files processed
602                '''
603               
604                numfilesproc = 0
605                badXMLfiles = 0         
606               
607                self.inputFileOrigFinal = {}
608                self.badXMLfileList = {}               
609                self.difXML = None
610               
611                #create dictionary for a mapping between original files and converted discovery files
612                self.originalsDiscoveryFilesMap = {}
613               
614                logging.info(self.lineSeparator)
615                logging.info("Renaming files plop:")
616               
617               
618                if self.ingestProcessID is not None and self.procID is None:
619                       
620                        self._getPostgresDBConnectionLogging()
621                                               
622                        #if process id has been supplied, then update the logging db with a "start_ingest"
623                        sqlStatusCmd = "select update_ingest_status (%s, 'run_ingest');" %self.ingestProcessID
624                        self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
625                       
626                        sqlStatusCmd = "select update_ingest_info (%s, ' (preprocessing...)');" %(self.ingestProcessID)
627                        self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
628               
629                for filename in os.listdir(originals_dir):
630                       
631                        if not filename.endswith('.xml'):
632                                logging.warning('File %s is not xml format. Not processed'  %(filename))
633                                continue
634                       
635                        original_filename = originals_dir + filename
636                       
637                        #get the name of the file to be used in the xquery
638                        metadataFilename = filename.replace('.xml', '')
639                               
640                       
641                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
642                       
643                        #NOTE as of 120911 took out provision for DIF ingestion..
644                        if format == 'DIF_9.4':
645                                logging.error("DIF ingestion no longer supported into NERC DWS!!!")
646                                sys.exit(2)
647                       
648                        else:                           
649                                isoFormat = format
650                               
651                                #perform schematron validation of input file...
652                                #xmlValidation = self.SchematronValidate.validate(originals_dir + filename,True)
653                               
654                               
655                                self.originalXMLdoc = "null"
656                               
657                               
658                                #generate the ISO object from the ISO xml..                     
659                                                                                       
660                                isoDataModel = ExtractISO(original_filename,self._isoClass[self._datacentre_format])
661                               
662                                if isoDataModel.createISOdataStructure() is True:
663                                                logging.info("ISO extractor worked fine!")
664                                               
665                                                #Back to main stream of ISO ingest
666                                                if self._NDG_dataProvider:
667                                                               
668                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
669                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
670                                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
671                               
672                                                else:
673                                                        mappingIndex = isoDataModel.findTheListData(isoDataModel.datasetID)     
674                                                        ident = isoDataModel.datasetID[mappingIndex][0]
675                                                       
676                                                        ident = ident.replace(":", "-")
677                                                        ident = ident.replace("/", "-")
678                               
679                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
680                                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
681                               
682                                                #now create this stub ISO file on system so can access it
683                                                self.genIsoFile = discovery_dir + new_filename_short   
684                                                                               
685                                               
686                                                #get the converted ISO into a local var
687                                                try:
688                                                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
689                                                       
690                                                        #01/02/11: Put in a workaround for MEDIN - need original XML with ALL attributes.  Not doing a NDG redirect, so take original file if this is the case
691                                                        if not self.changeUrls:
692                                                                logging.info("No NDG redirection required so using original XML as is..")
693                                                                self.isoXML = file(original_filename).read()
694                                                                self.originalXMLdoc = self.isoXML
695                                                        else:
696                                                                #original method...     
697                                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
698                                                                               
699                                                                self.isoXML = file(self.genIsoFile).read()
700                                                                self.originalXMLdoc = self.isoXML
701                                                       
702                                                        isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
703                                                       
704                                                       
705                                                except:
706                                                        logging.warn("Could not extract converted ISO xml to local variable!")
707                                                        self.isoXML = ''
708                                                       
709                                                logging.info("original file = " + original_filename)
710                                                logging.info("newfile = " + new_filename)
711                               
712                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
713                                                #this links a derived filename in processing dir with original filename
714                                                #get basic filename from the path+filename passed to this function to use as key               
715                                                self.inputFileOrigFinal[new_filename_short]=filename
716                                               
717                                                numfilesproc += 1
718                                                                                               
719                                elif isoDataModel.createISOdataStructure() is True:
720                                        logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
721                                                #sys.exit() # TODO fix these
722                                       
723                                else:                                   
724                                        logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")
725                                        badXMLfiles += 1
726                                        self.badXMLfileList[new_filename_short]=filename
727                               
728                       
729                                self.originalsDiscoveryFilesMap[new_filename_short] = filename
730                       
731                logging.info("File renaming and converting completed")
732                logging.info(self.lineSeparator)
733               
734                       
735                return numfilesproc,badXMLfiles
736       
737       
738        def _getPostgresDBConnection(self):
739                '''
740                Get the default postgres DB connection - by reading in data from the db config file
741                '''
742               
743                logging.debug("Setting up connection to postgres DB")
744                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
745               
746               
747                self.pgc = pgc(configFile = self._databaseConfigurationFile)
748                logging.info("Postgres DB connection now set up")
749               
750               
751       
752        def _getPostgresDBConnectionLogging(self):
753                '''
754                Get the default postgres DB connection - by reading in data from the db config file
755                '''
756               
757                logging.debug("Setting up connection to postgres DB")
758                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
759               
760                self.pgc_IngestLog = pgc(configFile = self.ingestLoggingDatabaseConfigurationFile)
761                logging.info("Postgres DB connection now set up")
762
763
764
765        def     _backupAndCleanData(self):
766                '''
767                Back up ingested data for specified data centre, then clean any of
768                the ingest dirs
769                '''
770                logging.info("Backing up ingested data")
771                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
772                  strftime("%y%m%d_%H%M") + "_originals/"
773               
774                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
775                logging.info("Data backed up - now clearing ingest directories")
776                #Clear out the original harvest records area and discovery dir
777                #FileUtilities.cleanDir(self.originals_dir)
778                #FileUtilities.cleanDir(self.discovery_dir)
779                logging.info("Ingest directories cleared")
780
781
782        def     _setupDataCentreDirs(self):
783                '''
784                Set up directories appropriate for the current data centre
785                '''
786                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
787               
788               
789                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
790               
791                # the following dirs define where the specific documents should go
792                self.originals_dir = data_dir + "/oai/originals/" # copy of original
793                self.discovery_dir = data_dir + "/discovery/"
794                self.ingestReport_dir = data_dir + "/ingestReports/"
795               
796                # Create/clear the 'in' and 'out' directories
797                FileUtilities.setUpDir(self.originals_dir)
798                FileUtilities.setUpDir(self.discovery_dir)
799                FileUtilities.setUpDir(self.ingestReport_dir)
800               
801                logging.info("Ingest directories for data centre set up")
802
803
804        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
805                '''
806                Convert files from originals dir to discovery one, then
807                ingest, backup and clean
808                @param originals_dir: directory to ingest docs from
809                @param discovery_dir: directory to use to process ingested docs
810                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
811                '''
812               
813                self.thisIngestStartDate = str(datetime.datetime.now())
814               
815                numfilesproc,badXmlFiles = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
816               
817                filenames = os.listdir(self.discovery_dir)
818               
819                ingestProbMsg = ""
820                ingestWarnMsg = ""
821                               
822                #generate a list of files ALREADY in database so can compare with what has been ingested
823                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
824                               
825                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
826                filesPresentList=[]
827       
828               
829                if filePresentListArr:
830                        for fileArr in filePresentListArr:
831                                filesPresentList.append(fileArr[0])
832                               
833                                #TODO - is above relevant - duplicating functionlaity?
834       
835                #create list to to hold files ingest failed on.
836                self.updateFailList = []
837                self.deletedFailList = []
838                self.problemMessageList = {}   
839                self.warnMessageList = {}
840               
841                counter = 1
842                warningCounter = 0
843                               
844                for filename in filenames:
845                       
846                        fullPath = self.discovery_dir + filename
847                       
848                        #if process id has been supplied, then update the logging db with a "start_ingest"
849                        if self.ingestProcessID is not None and self.procID is None:
850                                sqlStatusCmd = "select update_ingest_info (%s, ' (ingesting %s of %s)');" %(self.ingestProcessID,counter,numfilesproc)                 
851                                                               
852                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
853                       
854                       
855                        if os.path.isfile(fullPath):
856                                       
857                                thisIngestedID = self.addFileToPostgresDB(fullPath)
858                               
859                                #record all problem messages
860                                #ingestProbMsg = ingestProbMsg + self.ingestProblemMessage + "\n"
861                               
862                                if self.ingestProblemMessage != '':
863                                        ingestProbMsg = ingestProbMsg + self.ingestProblemMessage
864                                        #Need to record datasetname here as well as the filename - filenames are usually useless (i.e. CSW harvest)
865                                        #key = '(%s(%s)' %(self.originalsDiscoveryFilesMap[filename], self.isoDataModel.datasetName[0][0])
866                                        key = self.originalsDiscoveryFilesMap[filename]
867                                        self.problemMessageList[key] = self.ingestProblemMessage
868                                       
869                                else:
870                                       
871                                        #at this point, check to see if record can be aggregated for publishing to CSW?
872                                        if self._PublishToCSW == "True":
873                                               
874                                                #copy file to specified aggregation directory
875                                                FileUtilities.copyFileToDir(fullPath, self._AggregateDir)
876                                       
877                                if self.ingestWarningMessage != '':
878                                        warningCounter += 1
879                                        ingestWarnMsg = ingestWarnMsg + self.ingestWarningMessage
880                                        self.warnMessageList[filename] = self.ingestWarningMessage
881                                       
882                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
883                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
884                                        if thisIngestedID in filesPresentList: 
885                                                filesPresentList.remove(thisIngestedID)
886                       
887                       
888                        counter += 1   
889               
890                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
891                #will need to be removed.
892               
893                #only do this if not in single file mode (or else wverything else gets deleted!)               
894                if ((self.indFileToIngest == "") & (feed)):
895                        for item in filesPresentList:
896                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
897                                DeleteRecord(item)
898                                self.deletedFailList.append(item)
899                                self._no_files_deleted += 1
900                       
901                       
902                self._backupAndCleanData()
903               
904                #at this stage put the reporting code in (will work for oai or atom feed)
905                #create a summary file for each data centre ingest
906                data_dir = self._base_dir + "data/"
907                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
908                recOpFile = open(recOpFileName,'w')
909               
910                logging.info("oai_document_ingest processing complete:")
911               
912                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
913                self.thisIngestEndDate = str(datetime.datetime.now())
914               
915                messageSeparator = "\n\n********************************************************************************"
916               
917                message = "Ingest report for data centre: " + datacentre + "\n"
918                message = message + "Ingest date: " + self.thisIngestEndDate + "\n"
919                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
920                message = message + "TOTAL_NUMBER_OF_FILES " + str(numfilesproc+badXmlFiles) + "\n"
921                message = message + "TOTAL_PROCESSED_SUCCESFULLY " + str(numfilesproc) + "\n"
922                message = message + "TOTAL_PROCESSED_UNSUCCESFULLY " + str(badXmlFiles) + "\n" 
923                message = message + "INGESTED_Created " + str(self._no_files_ingested)  + "\n"
924                message = message + "INGESTED_Updated " + str(self._no_files_changed)  + "\n"
925                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
926               
927                if warningCounter > 0:
928                        message = message + messageSeparator
929                        message = message + "\nINGEST WARNINGS:  " + str(warningCounter) + "\n"
930                       
931                        message = message +  "\nBelow are the files that had ingest warnings...\n\n"
932                       
933                        for warnFile in self.warnMessageList.keys():
934                                message = message + "\nWARNING_FILE " + self.originalsDiscoveryFilesMap[warnFile] + "    " + self.warnMessageList[warnFile] + "\n"
935                       
936               
937                if self._no_problem_files > 0: 
938                        message = message + messageSeparator
939                        message = message + "\nPROBLEM_FILES: " + str(self._no_problem_files)  + "\n"
940                       
941               
942                if len(self.updateFailList) > 0:
943                        message = message +  "\nBelow are comments recorded on problem files...\n\n"
944                       
945                        for badFile in self.problemMessageList.keys(): #self.updateFailList:
946                                #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
947                               
948                                #NOTE: there may not be a problem message for every file that failed to ingest..
949                                #if badFile in self.problemMessageList.keys():
950                                message = message +"\nPROBLEM_FILE " + badFile + "\n"
951                                message = message + self.problemMessageList[badFile] + "\n\n"
952                               
953               
954                if len(self.badXMLfileList.keys()) > 0:
955                        message = message + messageSeparator
956                        message = message +  "\nBelow are the files that could not be parsed (i.e. bad XML?)...\n\n"
957                        for badXmlFile in self.badXMLfileList.values():
958                                message = message + "BAD_XML_FILE: " + badXmlFile +"\n"
959               
960                                               
961                #log processing results to ingestLogging table if required to           
962               
963                if self.ingestProcessID is not None:
964                        logging.info("Updating ingest logging database")
965                                               
966                        sqlLoggingCmd = "select add_ingest_stats(%s,'%s','%s','%s','%s','%s','%s','%s','%s','%s');"%(self.ingestProcessID,self.thisIngestStartDate,self.thisIngestEndDate,numfilesproc+badXmlFiles,numfilesproc,badXmlFiles,self._no_files_ingested,self._no_files_changed,self._no_files_deleted,self._no_problem_files)
967                                       
968                        #update ingestLogging                   
969                        self.pgc_IngestLog.runSQLCommand(sqlLoggingCmd)
970                       
971                        #now update problem file log if there are problem files
972                        if self._no_problem_files > 0:
973                                logging.info("There are %s problem files - recording these in stats logging db" %self._no_problem_files)
974                               
975                               
976                                #note sometimes message will not always be recorded, so just record name of file
977                                if len(self.problemMessageList.keys()) < 1:
978                                       
979                                        for badFile in self.updateFailList:
980                                                sqlProbFileCmd = "select add_problem_file(%s,'%s','%s');"%(self.ingestProcessID,badFile,'Could not catch precise error for this file!!')
981                               
982                                                #update ingestLogging           
983                                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)
984                               
985                                else:
986                                        for badFile in self.problemMessageList.keys():                                 
987                                                sqlProbFileCmd = "select add_problem_file(%s,'%s','%s');"%(self.ingestProcessID,badFile,self.problemMessageList[badFile])
988                                                       
989                                                #update ingestLogging                   
990                                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)               
991                               
992                                       
993                        #if process id has been supplied, then update the logging db with a "start_ingest" (only for DPWS ops)
994                        #import pdb; pdb.set_trace()
995                        if self.procID is None:
996                                sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
997                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
998                       
999                #update with content from the problem messages recorded
1000                #message = message + ingestProbMsg + "\n"               
1001                         
1002                recOpFile.write(message)
1003                               
1004                return numfilesproc,badXmlFiles, message
1005
1006
1007
1008        def _setupXQueries(self):
1009                '''
1010                Set up the required XQueries
1011                - NB, extract the xquery libraries locally for easy reference
1012                '''
1013                self._xq = ndgResources()
1014                for libFile in self._xq.xqlib:
1015                        # NB, we don't want the full path to the files - just the filename
1016                        fileName = libFile.split('/')[-1]
1017                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.