source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 7300

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@7300
Revision 7300, 34.3 KB checked in by sdonegan, 10 years ago (diff)

various updates to ingest (note change of name of main class) - can be called from dpws api now with just process id, or indivudual or run_all_ingest

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24
25class AbstractDocumentIngester(object):
26        '''
27        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
28        - including running the various transforms and parsings to get all doc types and spatiotemporal
29        data in the correct form in the DB
30        '''
31        lineSeparator = "------------------------------"
32               
33        # The directory to put things for a tape backup (should already exist)
34        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
35       
36        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
37               
38        def _setupCmdLineOptions(self):
39                '''
40                Determine the logging level to use and configure this appropriately
41                @return args: any input arguments - excluding options
42                '''
43               
44                # check for verbose option
45                try:
46                        opts, args = getopt.getopt(sys.argv[1:], "vd")
47                except getopt.GetoptError, err:
48                    # print help information and exit:
49                    print str(err) # will print something like "option -a not recognized"
50                    sys.exit(2)
51                   
52                if len(args) < 1:
53                        self.usage()
54                   
55                loggingLevel = logging.ERROR
56                for o, a in opts:
57                    if o == "-v":
58                        print " - Verbose mode ON"
59                        loggingLevel = logging.INFO
60                    elif o == "-d":
61                        print " - Debug mode ON"
62                        loggingLevel = logging.DEBUG
63                   
64               
65                # set up any keywords on the object
66                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
67                checkConfInArgs = False         
68                checkProcID = False
69                for arg in args:
70                       
71                        keywordArgs=['ingestFromDate','individualFile','interval','ingestFromDate',\
72                                                'eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile', \
73                                                'ingestProcessID']
74                                               
75                       
76                        bits = arg.split('=')
77                        if len(bits) == 2:
78                                if bits[0] == keywordArgs[3]:
79                                        self.setIngestFromDate(bits[1])
80                                elif bits[0] == keywordArgs[2]:
81                                        self.setPollInterval(bits[1])
82                                elif bits[0] == keywordArgs[7]:
83                                        print " - Running with specified config file!"
84                                        self.setOaiConfigFile(bits[1])
85                                        checkConfInArgs = True
86                                elif bits[0] == keywordArgs[1]:
87                                        print " - Running in single file ingestion mode!"
88                                        self.setIndFileToIngest(bits[1])
89                                elif bits[0] == keywordArgs[8]:
90                                        print " - WARNING: will log results of this ingestion process into the specified logging database! (NOTE: will also take location of metadata from database as this option intended for DPWS!!)"
91                                        self.setIngestLogID(bits[1])
92                                        checkProcID = True                             
93                                elif bits[0] not in keywordArgs:
94                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
95                                        sys.exit(2)
96                                else:
97                                        setattr(self, bits[0], bits[1])
98                       
99                if checkConfInArgs is False:
100                        print "\nWARNING: Ingest config file has not been specified. Using local default"
101                        self.setOaiConfigFile('./oai_document_ingester.config')
102                       
103               
104               
105                print self.lineSeparator
106               
107                # NB, this is a slight fudge as cannot get the detailed logging to work
108                # without setting up a new logger here - which means we get two loggers
109                # outputing data. The initial call to logging needs to be tracked down
110                # and configured correctly, so this can be avoided...
111#               self.logger = logging.getLogger()
112#               self.logger.setLevel(loggingLevel)
113
114                # create console handler and set level to debug
115#               ch = logging.StreamHandler()
116#               ch.setLevel(loggingLevel)
117                # create formatter
118#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
119                # add formatter to ch
120#               ch.setFormatter(formatter)
121                # add ch to logger
122#               self.logger.addHandler(ch)
123               
124                return args
125
126
127        def getID(self, filename):
128                '''
129                Gets the identifier out of an input metadata xml record.
130                @param filename - name of document file being processed
131                @return: ID - id to use to refer to the document
132                '''
133                logging.info("Retrieving identifier for metadata record " + filename)
134       
135                xml=file(filename).read()
136               
137                ID = idget(xml)
138               
139                return ID
140       
141       
142        def addFileToPostgresDB(self, filename):
143                '''
144                Add a file to the postgres DB - extracting and storing all the required
145                data in the process
146                @param filename: full path of file to add to postgres DB
147                '''
148                logging.info("Adding file, " + filename + ", to postgres DB")
149               
150                numSlash = len(filename.split('/'))
151                               
152                shortFilename = filename.split('/')[numSlash - 1]
153               
154                self.recordAttemptToIngestDiscoveryID = ""
155               
156                self.ingestProblemMessage = ""
157               
158                #need to generate a local version of the iso data model (first call was to run through all available files
159                #and convert & place into relevant dirs as necessary
160                #generate the ISO object from the converted ISO here now..
161               
162                try:
163                                                                                                                       
164                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
165                       
166                                                       
167                        if self.isoDataModel.createISOdataStructure() is True:
168                                logging.info("ISO extractor worked fine!")
169                       
170                       
171                        elif self.isoDataModel.createISOdataStructure() is True:
172                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
173                                                #sys.exit() # TODO fix these
174                                return
175                        else:
176                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
177                                                #sys.exit()
178                                return
179                except:
180                        logging.error("Could not generate ISO XML!")
181                        return
182               
183                #get the converted ISO into a local var
184                try:
185                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
186                        self.isoXML = file(filename).read()
187                        self.originalXMLdoc = self.isoXML
188                                       
189                except:
190                        logging.warn("Could not extract converted ISO xml to local variable!")
191                        self.isoXML = ''
192                               
193               
194                # first of all create a PostgresRecord - this object represents all the data required
195                # for a DB entry
196                dao = None
197               
198                try:
199                       
200                        #if record is valid (vital elements checked in ExtractISO - validDoc is True if everything ok..
201                       
202                        #record whats bout to be ingested - if a problem this, error should already have been kicked out                                       
203                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[0][0]
204                       
205                        if self.isoDataModel.validDoc is True:                 
206                               
207                               
208                                #record whats attempting to be ingested
209                                record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
210                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
211                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, self._currentMedinStandard, stubIso = self.isoXML, difXML = self.difXML, isoXML = self.isoXML)                   
212                               
213                                # Now create the data access object to interface to the DB                     
214                                dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc )
215                       
216                                # Finally, write the new record
217                                # 0 if failed, 1 if update, 2 if create
218                                returnCode = dao.createOrUpdateRecord() 
219                                               
220                                if returnCode == 2:
221                                        self._no_files_ingested += 1
222                               
223                                elif returnCode == 1:
224                                        self._no_files_changed += 1
225                                       
226                        else:
227                                self.ingestProblemMessage= "Content validity problems for record: %s (%s)" %(self.recordAttemptToIngestDiscoveryID,self.isoDataModel.processingMsg)
228                               
229                                logging.warn(self.ingestProblemMessage)
230                               
231                                #now raise error so can record this
232                                raise ValueError,self.ingestProblemMessage
233                       
234                except:
235                       
236                       
237                        #if error encountered, add to failure lisr
238                        logging.error("Could not update: " + filename)                 
239                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
240                        self.updateFailList.append(originalRecordFilename)
241                       
242                        logging.error("Exception thrown - detail: ")
243                        errors = sys.exc_info()
244                        logging.error(errors)
245                        self._error_messages += "%s\n" %str(errors[1])
246                       
247                        if dao:
248                                logging.info("Removing record and its associated info from DB")
249                                logging.info("- to allow clean ingestion on rerun")
250                                try:
251                                        dao.deleteOriginalRecord()
252                                except:
253                                        logging.error("Problem encountered when removing record: ")
254                                        logging.error(sys.exc_info())
255                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
256
257                        self._no_problem_files += 1
258                       
259                        logging.info("Continue processing other files")
260                       
261                return self.recordAttemptToIngestDiscoveryID
262                       
263       
264        def getProcessingConfig(self,configFilePath):
265               
266                '''
267                Fed up with messing about with hardcoded directory paths. 
268                This method to get relevant values out of the oai_document_ingester.config file
269               
270                Returns a dictionary of values with keys:
271               
272                #directory in which the code resides
273                code_directory /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/
274
275                #base directory in which metadata is extracted to and converted to
276                base_directory /home/badc/discovery_docs/ingestDocs/
277
278                #directory in which to write reports to
279                reporting_directory /home/badc/discovery_docs/ingestDocs/data/
280
281                #path to the passwords file
282                passwords_file /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/passwords.txt
283
284                #datacentre config file directory path
285                datcentre_configs /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/
286                '''
287               
288                # Check this file exists; if not, assume an invalid datacentre has been specified
289               
290                if not os.path.isfile(configFilePath):
291                    sys.exit("ERROR: Could not find the processing config file")
292                   
293                processingConfig = {}
294                   
295                processing_config_file = open(configFilePath, "r")
296               
297                for line in processing_config_file.readlines():
298                        words  = string.split(line)
299                        if len(words) == 0:
300                                continue
301                        elif words[0] == 'code_directory':
302                                processingConfig['code_directory'] = words[1]
303                                                                                               
304                        elif words[0] == 'base_directory':
305                                processingConfig['base_directory'] = words[1]
306                               
307                        elif words[0] == 'reporting_directory':
308                                processingConfig['reporting_directory'] = words[1]
309                               
310                        elif words[0] == 'passwords_file':
311                                processingConfig['passwords_file'] = words[1]
312                               
313                        elif words[0] == 'datcentre_configs':
314                                processingConfig['datcentre_configs'] = words[1]
315                                       
316                        elif words[0] == 'changeURLs':
317                                processingConfig['changeURLs'] = words[1]
318                               
319                        elif words[0] == 'NDG_redirect_URL':
320                                processingConfig['NDG_redirect_URL'] = words[1]
321                               
322                        elif words[0] == 'ingestLoggingConfig':
323                                processingConfig['ingestLoggingConfig'] = words[1]
324                               
325                        elif words[0] == 'ingestConfig':
326                                if not os.path.isfile(words[1]):
327                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
328                                else:
329                                        processingConfig['ingestConfig'] = words[1]
330                               
331                        elif words[0] == 'acceptedFormats':
332                                processingConfig['acceptedFormats'] = words[1]
333                               
334                        elif words[0] == 'saxonJarFile':
335                                if not os.path.isfile(words[1]):
336                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
337                                else:
338                                        processingConfig['saxonJarFile'] = words[1]
339                        elif words[0] == 'xpathIsoClass':
340                               
341                                #build a dictionary with format as the key                             
342                                isoClass = words[1].split(":")                         
343                               
344                                if 'xpathIsoClass' not in processingConfig.keys():
345                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
346                                else:
347                                        temp = processingConfig['xpathIsoClass']                                       
348                                        temp.update({isoClass[0]:isoClass[1]})
349                                        processingConfig['xpathIsoClass'] = temp
350                               
351                        elif words[0] == 'xqueryStubIsoGen':
352                               
353                                #build a dictionary with format as the key
354                                isoClass = words[1].split(":")
355                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
356                               
357                        elif words[0] == 'xqueryConversions':
358                               
359                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
360                                processingConfig['xqueryConversions'] = words[1].split(",")
361                               
362                        elif words[0] == 'currentMEDIN':
363                               
364                                #define the current MEDIN standard so can cast conversions as this (at some stage we will ingest a few different versions)
365                                processingConfig['currentMEDIN'] = words[1]
366                               
367                        elif words[0] == 'exceptXqConv':
368                               
369                                #build a dictionary with format as the key
370                                xqClass = words[1].split(":")
371                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
372                               
373                        elif words[0] == 'xqDocTypes':
374                               
375                                #build a dictionary with format as the key
376                                xqClass = words[1].split(":")
377                               
378                                if 'xqDocTypes' not in processingConfig.keys():
379                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
380                                else:
381                                        temp = processingConfig['xqDocTypes']                                   
382                                        temp.update({xqClass[0]:xqClass[1]})
383                                        processingConfig['xqDocTypes'] = temp
384                                       
385                processing_config_file.close()
386                       
387                return processingConfig
388       
389       
390        def getConfigDetails(self, datacentre):
391                '''
392                Get the harvested records directory and groups for this datacentre from the
393                datacentre specific config file.  The harvested records directory depends on the
394                datacentres OAI base url, the set and format. These have to be know up-front.
395                The groups denote which 'portal groups' they belong to - for limiting searches to
396                say NERC-only datacentres records.
397                Groups are added to the intermediate MOLES when it is created.
398                @param datacentre: datacentre to use when looking up config file
399                '''
400                # initialise the variables to retrieve from the config file
401                self._harvest_home = ""
402                self._datacentre_groups = ""
403                self._datacentre_format = ""
404                self._datacentre_namespace = ""
405                self._NDG_dataProvider = False
406                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
407               
408               
409                #note changed to production buildout path
410                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
411               
412                #fed up with this rubbish causing problems - use a normal file opener.
413                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
414               
415               
416                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
417                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
418               
419                # Check this file exists; if not, assume an invalid datacentre has been specified
420                if not os.path.isfile(self._datacentre_config_filename):
421                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
422                        "specified (%s) is invalid\n" %datacentre)
423                   
424               
425                datacentre_config_file = open(self._datacentre_config_filename, "r")
426               
427                for line in datacentre_config_file.readlines():
428                        words  = string.split(line)
429                        if len(words) == 0:
430                                continue
431                        elif words[0] == 'host_path':
432                                self._harvest_home = words[1].rstrip()
433                        elif words[0] == 'groups':
434                                self._datacentre_groups = words[1:]
435                        elif words[0] == 'format':
436                                self._datacentre_format = words[1]
437                        elif words[0] == 'namespace':
438                                self._datacentre_namespace = words[1]
439                        elif words[0] == 'NDG_dataProvider':
440                                self._NDG_dataProvider = True
441                        elif words[0] == 'feed':
442                                self._IngestViaFeed = True
443                       
444                datacentre_config_file.close()
445               
446                #stop ingest on this datacentre if process thread is OAI and the config says FEED
447                if ((self.processThread == 'OAI') and self._IngestViaFeed):
448                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
449                                               
450                if self._harvest_home == "":
451                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
452               
453                logging.info("harvested records are in " + self._harvest_home)
454               
455                if self._datacentre_groups == "":
456                    logging.info("No groups/keywords set for datacentre " + datacentre)
457                else:
458                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
459               
460                if self._datacentre_format == "":
461                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
462               
463                logging.info("format being harvested: " + self._datacentre_format)
464               
465                if self._datacentre_namespace == "":
466                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
467               
468                logging.info("datacentre namespace: " + self._datacentre_namespace)
469               
470                if self._NDG_dataProvider:
471                        logging.info("Datacentre classified as an NDG data provider")
472                else:
473                        logging.info("Datacentre is not classified as an NDG data provider")
474                logging.info(self.lineSeparator)
475               
476               
477        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
478                '''
479                Processes/renames the files (changed 08/01/07 to get id from inside file)
480                 - also replace any namespace declarations with a standard one which we know works in NDG
481                 NB, this copies files from the original dir to the discovery dir
482                 @param originals_dir: directory to convert files from
483                 @param discovery_dir: directory in which converted files will end up
484                 @return numfilesproc: counter of number of files processed
485                '''
486                numfilesproc = 0
487                badXMLfiles = 0         
488               
489                self.inputFileOrigFinal = {}
490                self.badXMLfileList = {}               
491                self.difXML = None
492               
493                logging.info(self.lineSeparator)
494                logging.info("Renaming files:")
495               
496                if self.ingestProcessID is not None and self.procID is None:
497                       
498                        self._getPostgresDBConnectionLogging()
499                                               
500                        #if process id has been supplied, then update the logging db with a "start_ingest"
501                        sqlStatusCmd = "select update_ingest_status (%s, 'run_ingest');" %self.ingestProcessID
502                        self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
503               
504                for filename in os.listdir(originals_dir):
505                       
506                        if not filename.endswith('.xml'):
507                                logging.warning('File %s is not xml format. Not processed'  %(filename))
508                                continue
509                       
510                        original_filename = originals_dir + filename
511                       
512                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
513                        if format == 'DIF_9.4':
514                                                       
515                                logging.info("Converting DIF to stub ISO...")                                   
516                               
517                                #if DIF still need to convert URL's using the old method so when requesting DIF in original format we still have correct redirect urls....
518                                #redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self._ndgRedirectURL)
519                                                                                                               
520                                #gets the metadata id from the xml
521                                metadataID=self.getID(original_filename)
522                                #import pdb
523                                #pdb.set_trace()
524                                                               
525                                repositoryName = self._datacentre_namespace
526                               
527                                #get the name of the file to be used in the xquery
528                                metadataFilename = filename.replace('.xml', '')
529                                                               
530                                #where is the file to be ingested located?
531                                metadataFileLoc = originals_dir
532                               
533                                #generate a new stubISO filename
534                                self.isoFormat= "stubISO"
535                                                               
536                               
537                                #self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename)
538                                self.xqueryTransformation = xqueryTransformation(metadataFileLoc,repositoryName,metadataID,metadataFilename,self._saxonJarFile)
539                                                                                                                       
540                                #get hold of the xml for the transformed ISO                           
541                                self.isoXML = self.xqueryTransformation.runXquery(self._dif2isoXquery)
542                                                               
543                                #create the iso file on the system
544                                tempStubISOfile = original_filename + ".stubISO.xml"
545                                FileUtilities.createFile(tempStubISOfile , self.isoXML)
546                               
547                                #generate the ISO object from the converted ISO here now..
548                                try:
549                                       
550                                        #temp assignment so failure mode supported
551                                        new_filename_short = original_filename
552                                                                       
553                                        isoDataModel = ExtractISO(tempStubISOfile,self._isoClass[self._datacentre_format])
554                               
555                                        if isoDataModel.createISOdataStructure() is True:
556                                                logging.info("ISO extractor worked fine!")
557                                               
558                                                #need to get hold original format XML too (so can put directly into "transformed" docs section...
559                                                self.originalXMLdoc = ''
560                               
561                                                try:
562                                                        logging.info("Extracting original input file into variable for input into original format in transformed docs table")
563                                                        self.originalXMLdoc = file(original_filename).read()
564                                                        self.difXML = self.originalXMLdoc
565                                       
566                                                except:
567                                                        logging.warn("Could not extract original file to local variable!")
568                                                       
569                                               
570                                                #Back to main stream of ISO ingest
571                                                if self._NDG_dataProvider:
572                                       
573                                                        #to ensure "stubISO" doesnt make it into the filename on system and in the db, use the self.
574                                                               
575                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
576                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + format + "__"+ metadataFilename.replace(":", "-")+".xml"
577                                                        new_filename_short = self._datacentre_namespace + "__" + format + "__" + metadataFilename.replace(":", "-")+".xml"
578                               
579                                       
580                                                else:
581                                                        ident = isoDataModel.datasetID[0][0]
582                                       
583                                                        ident = ident.replace(":", "-")
584                                                        ident = ident.replace("/", "-")
585                               
586                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
587                                                        new_filename_short = self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
588                               
589                                       
590                               
591                                                #now create this stub ISO file on system so can access it
592                                                self.genIsoFile = discovery_dir + new_filename_short   
593                                                               
594                                                #FileUtilities.createFile(self.stubIsoFile, self.isoXML)
595                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
596                                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
597                                               
598                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
599                                                #this links a derived filename in processing dir with original filename
600                                                #get basic filename from the path+filename passed to this function to use as key               
601                                                self.inputFileOrigFinal[new_filename_short]=filename
602                                               
603                                                numfilesproc += 1
604                       
605                       
606                                        elif isoDataModel.createISOdataStructure() is True:
607                                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
608                                                #sys.exit() # TODO fix these
609                                               
610                                        else:
611                                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
612                                                badXMLfiles += 1
613                                                self.badXMLfileList[new_filename_short]=filename
614                                                                       
615                                except:
616                                        logging.error("Could not generate ISO XML from DIF format!!")
617                                        badXMLfiles += 1
618                                        self.badXMLfileList[new_filename_short]=filename
619                               
620                       
621                        #all other ISO formats/profiles to deal with.
622                        else:                           
623                                isoFormat = format
624                               
625                                self.originalXMLdoc = "null"
626                               
627                               
628                                #generate the ISO object from the ISO xml..                                                                             
629                                isoDataModel = ExtractISO(original_filename,self._isoClass[self._datacentre_format])
630                               
631                                if isoDataModel.createISOdataStructure() is True:
632                                                logging.info("ISO extractor worked fine!")
633                                               
634                                                #Back to main stream of ISO ingest
635                                                if self._NDG_dataProvider:
636                                                               
637                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
638                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
639                                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
640                               
641                                                else:
642                                       
643                                                        ident = isoDataModel.datasetID[0][0]
644                               
645                                                        ident = ident.replace(":", "-")
646                                                        ident = ident.replace("/", "-")
647                               
648                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
649                                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
650                               
651                                                #now create this stub ISO file on system so can access it
652                                                self.genIsoFile = discovery_dir + new_filename_short   
653                               
654                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
655                                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
656                               
657                                                #get the converted ISO into a local var
658                                                try:
659                                                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
660                                                        self.isoXML = file(self.genIsoFile).read()
661                                                        self.originalXMLdoc = self.isoXML
662                                       
663                                                except:
664                                                        logging.warn("Could not extract converted ISO xml to local variable!")
665                                                        self.isoXML = ''
666                                                       
667                                                logging.info("original file = " + original_filename)
668                                                logging.info("newfile = " + new_filename)
669                               
670                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
671                                                #this links a derived filename in processing dir with original filename
672                                                #get basic filename from the path+filename passed to this function to use as key               
673                                                self.inputFileOrigFinal[new_filename_short]=filename
674                                               
675                                                numfilesproc += 1
676                                                                                               
677                                elif isoDataModel.createISOdataStructure() is True:
678                                        logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
679                                                #sys.exit() # TODO fix these
680                                       
681                                else:                                   
682                                        logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")
683                                        badXMLfiles += 1
684                                        self.badXMLfileList[new_filename_short]=filename
685                       
686                logging.info("File renaming and converting completed")
687                logging.info(self.lineSeparator)
688               
689                       
690                return numfilesproc,badXMLfiles
691       
692       
693        def _getPostgresDBConnection(self):
694                '''
695                Get the default postgres DB connection - by reading in data from the db config file
696                '''
697               
698                logging.debug("Setting up connection to postgres DB")
699                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
700               
701               
702                self.pgc = pgc(configFile = self._databaseConfigurationFile)
703                logging.info("Postgres DB connection now set up")
704               
705               
706       
707        def _getPostgresDBConnectionLogging(self):
708                '''
709                Get the default postgres DB connection - by reading in data from the db config file
710                '''
711               
712                logging.debug("Setting up connection to postgres DB")
713                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
714               
715               
716                self.pgc_IngestLog = pgc(configFile = self.ingestLoggingDatabaseConfigurationFile)
717                logging.info("Postgres DB connection now set up")
718
719
720
721        def     _backupAndCleanData(self):
722                '''
723                Back up ingested data for specified data centre, then clean any of
724                the ingest dirs
725                '''
726                logging.info("Backing up ingested data")
727                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
728                  strftime("%y%m%d_%H%M") + "_originals/"
729               
730                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
731                logging.info("Data backed up - now clearing ingest directories")
732                #Clear out the original harvest records area and discovery dir
733                #FileUtilities.cleanDir(self.originals_dir)
734                #FileUtilities.cleanDir(self.discovery_dir)
735                logging.info("Ingest directories cleared")
736
737
738        def     _setupDataCentreDirs(self):
739                '''
740                Set up directories appropriate for the current data centre
741                '''
742                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
743               
744               
745                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
746               
747                # the following dirs define where the specific documents should go
748                self.originals_dir = data_dir + "/oai/originals/" # copy of original
749                self.discovery_dir = data_dir + "/discovery/"
750                #self.stubISO_dir = data_dir + "/stub_iso/"
751               
752                # Create/clear the 'in' and 'out' directories
753                FileUtilities.setUpDir(self.originals_dir)
754                FileUtilities.setUpDir(self.discovery_dir)
755               
756                logging.info("Ingest directories for data centre set up")
757
758
759        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
760                '''
761                Convert files from originals dir to discovery one, then
762                ingest, backup and clean
763                @param originals_dir: directory to ingest docs from
764                @param discovery_dir: directory to use to process ingested docs
765                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
766                '''
767               
768               
769                numfilesproc,badXmlFiles = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
770               
771                filenames = os.listdir(self.discovery_dir)
772               
773                ingestProbMsg = ""
774                               
775                #generate a list of files ALREADY in database so can compare with what has been ingested
776                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
777                               
778                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
779                filesPresentList=[]
780       
781               
782                if filePresentListArr:
783                        for fileArr in filePresentListArr:
784                                filesPresentList.append(fileArr[0])
785                               
786                                #TODO - is above relevant - duplicating functionlaity?
787       
788                #create list to to hold files ingest failed on.
789                self.updateFailList = []
790                self.deletedFailList = []
791                self.problemMessageList = {}                   
792               
793                for filename in filenames:
794                       
795                        fullPath = self.discovery_dir + filename
796                       
797                        if os.path.isfile(fullPath):
798                                       
799                                thisIngestedID = self.addFileToPostgresDB(fullPath)
800                               
801                                #record all problem messages
802                                #ingestProbMsg = ingestProbMsg + self.ingestProblemMessage + "\n"
803                               
804                                if self.ingestProblemMessage != '':
805                                        ingestProbMsg = ingestProbMsg + self.ingestProblemMessage
806                                       
807                                        self.problemMessageList[filename] = self.ingestProblemMessage
808                                       
809                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
810                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
811                                        if thisIngestedID in filesPresentList: 
812                                                filesPresentList.remove(thisIngestedID)                 
813                               
814                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
815                #will need to be removed.
816               
817                #only do this if not in single file mode (or else wverything else gets deleted!)               
818                if ((self.indFileToIngest == "") & (feed)):
819                        for item in filesPresentList:
820                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
821                                DeleteRecord(item)
822                                self.deletedFailList.append(item)
823                                self._no_files_deleted += 1
824                       
825                       
826                self._backupAndCleanData()
827               
828                #at this stage put the reporting code in (will work for oai or atom feed)
829                #create a summary file for each data centre ingest
830                data_dir = self._base_dir + "data/"
831                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
832                recOpFile = open(recOpFileName,'w')
833               
834                logging.info("oai_document_ingest processing complete:")
835               
836                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
837                self.thisIngestDate = str(datetime.datetime.now())
838               
839                message = "Ingest report for data centre: " + datacentre + "\n"
840                message = message + "Ingest date: " + self.thisIngestDate + "\n"
841                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
842                message = message + "TOTAL_NUMBER_OF_FILES " + str(numfilesproc+badXmlFiles) + "\n"
843                message = message + "TOTAL_PROCESSED_SUCCESFULLY " + str(numfilesproc) + "\n"
844                message = message + "TOTAL_PROCESSED_UNSUCCESFULLY " + str(badXmlFiles) + "\n" 
845                message = message + "INGESTED_Created " + str(self._no_files_ingested)  + "\n"
846                message = message + "INGESTED_Updated " + str(self._no_files_changed)  + "\n"
847                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"           
848                message = message + "PROBLEM_FILES " + str(self._no_problem_files)  + "\n"
849                                               
850                if len(self.updateFailList) > 0:
851                        message = message +  "\nBelow are comments recorded on problem files...\n\n"
852               
853                        for badFile in self.updateFailList:
854                                #recOpFile.write("PROBLEM_FILE " + badFile + "\n")     
855                                message = message +"\nPROBLEM_FILE " + badFile + "\n"
856               
857                if len(self.badXMLfileList.keys()) > 0:
858                        message = message +  "\nBelow are the files that could not be parsed (i.e. bad XML?)...\n\n"
859                        for badXmlFile in self.badXMLfileList.values():
860                                message = message + "BAD_XML_FILE " + badXmlFile +"\n"
861               
862                                               
863                #log processing results to ingestLogging table if required to           
864               
865                if self.ingestProcessID is not None:
866                        logging.info("Updating ingest logging database")
867                                               
868                        sqlLoggingCmd = "select add_ingest_stats('%s','%s','%s','%s','%s','%s','%s','%s','%s');"%(self.ingestProcessID,self.thisIngestDate,numfilesproc+badXmlFiles,numfilesproc,badXmlFiles,self._no_files_ingested,self._no_files_changed,self._no_files_deleted,self._no_problem_files)
869                                       
870                        #update ingestLogging                   
871                        self.pgc_IngestLog.runSQLCommand(sqlLoggingCmd)
872                       
873                        #now update problem file log if there are problem files
874                        if self._no_problem_files > 0:
875                                logging.info("There are %s problem files - recording these in stats logging db" %self._no_problem_files)
876                               
877                                #note sometimes message will not always be recorded, so just record name of file
878                                if len(self.problemMessageList.keys()) < 1:
879                                       
880                                        for badFile in self.updateFailList:
881                                                sqlProbFileCmd = "select add_problem_file('%s','%s','%s');"%(self.ingestProcessID,badFile,'Could not catch precise error for this file!!')
882                               
883                                else:
884                                        for badFile in self.problemMessageList.keys():                                 
885                                                sqlProbFileCmd = "select add_problem_file('%s','%s','%s');"%(self.ingestProcessID,badFile,self.problemMessageList[badFile])
886                                                                       
887                                #update ingestLogging                   
888                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)
889                                       
890                        #if process id has been supplied, then update the logging db with a "start_ingest" (only for DPWS ops)
891                        if self.procID is None:
892                                sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
893                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
894                       
895                #update with content from the problem messages recorded
896                message = message + ingestProbMsg + "\n"               
897                         
898                recOpFile.write(message)
899                               
900                return numfilesproc,badXmlFiles, message
901
902
903
904        def _setupXQueries(self):
905                '''
906                Set up the required XQueries
907                - NB, extract the xquery libraries locally for easy reference
908                '''
909                self._xq = ndgResources()
910                for libFile in self._xq.xqlib:
911                        # NB, we don't want the full path to the files - just the filename
912                        fileName = libFile.split('/')[-1]
913                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.