source: TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 7391

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4.3.0/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@7391
Revision 7391, 34.8 KB checked in by sdonegan, 10 years ago (diff)

Records ingest start and end time now

Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24
25class AbstractDocumentIngester(object):
26        '''
27        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
28        - including running the various transforms and parsings to get all doc types and spatiotemporal
29        data in the correct form in the DB
30        '''
31        lineSeparator = "------------------------------"
32               
33        # The directory to put things for a tape backup (should already exist)
34        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
35       
36        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
37               
38        def _setupCmdLineOptions(self):
39                '''
40                Determine the logging level to use and configure this appropriately
41                @return args: any input arguments - excluding options
42                '''
43               
44                # check for verbose option
45                try:
46                        opts, args = getopt.getopt(sys.argv[1:], "vd")
47                except getopt.GetoptError, err:
48                    # print help information and exit:
49                    print str(err) # will print something like "option -a not recognized"
50                    sys.exit(2)
51                   
52                if len(args) < 1:
53                        self.usage()
54                   
55                loggingLevel = logging.ERROR
56                for o, a in opts:
57                    if o == "-v":
58                        print " - Verbose mode ON"
59                        loggingLevel = logging.INFO
60                    elif o == "-d":
61                        print " - Debug mode ON"
62                        loggingLevel = logging.DEBUG
63                   
64               
65                # set up any keywords on the object
66                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
67                checkConfInArgs = False         
68                checkProcID = False
69                for arg in args:
70                       
71                        keywordArgs=['ingestFromDate','individualFile','interval','ingestFromDate',\
72                                                'eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile', \
73                                                'ingestProcessID']
74                                               
75                       
76                        bits = arg.split('=')
77                        if len(bits) == 2:
78                                if bits[0] == keywordArgs[3]:
79                                        self.setIngestFromDate(bits[1])
80                                elif bits[0] == keywordArgs[2]:
81                                        self.setPollInterval(bits[1])
82                                elif bits[0] == keywordArgs[7]:
83                                        print " - Running with specified config file!"
84                                        self.setOaiConfigFile(bits[1])
85                                        checkConfInArgs = True
86                                elif bits[0] == keywordArgs[1]:
87                                        print " - Running in single file ingestion mode!"
88                                        self.setIndFileToIngest(bits[1])
89                                elif bits[0] == keywordArgs[8]:
90                                        print " - WARNING: will log results of this ingestion process into the specified logging database! (NOTE: will also take location of metadata from database as this option intended for DPWS!!)"
91                                        self.setIngestLogID(bits[1])
92                                        checkProcID = True                             
93                                elif bits[0] not in keywordArgs:
94                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
95                                        sys.exit(2)
96                                else:
97                                        setattr(self, bits[0], bits[1])
98                       
99                if checkConfInArgs is False:
100                        print "\nWARNING: Ingest config file has not been specified. Using local default"
101                        self.setOaiConfigFile('./oai_document_ingester.config')
102                       
103               
104               
105                print self.lineSeparator
106               
107                # NB, this is a slight fudge as cannot get the detailed logging to work
108                # without setting up a new logger here - which means we get two loggers
109                # outputing data. The initial call to logging needs to be tracked down
110                # and configured correctly, so this can be avoided...
111#               self.logger = logging.getLogger()
112#               self.logger.setLevel(loggingLevel)
113
114                # create console handler and set level to debug
115#               ch = logging.StreamHandler()
116#               ch.setLevel(loggingLevel)
117                # create formatter
118#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
119                # add formatter to ch
120#               ch.setFormatter(formatter)
121                # add ch to logger
122#               self.logger.addHandler(ch)
123               
124                return args
125
126
127        def getID(self, filename):
128                '''
129                Gets the identifier out of an input metadata xml record.
130                @param filename - name of document file being processed
131                @return: ID - id to use to refer to the document
132                '''
133                logging.info("Retrieving identifier for metadata record " + filename)
134       
135                xml=file(filename).read()
136               
137                ID = idget(xml)
138               
139                return ID
140       
141       
142        def addFileToPostgresDB(self, filename):
143                '''
144                Add a file to the postgres DB - extracting and storing all the required
145                data in the process
146                @param filename: full path of file to add to postgres DB
147                '''
148                logging.info("Adding file, " + filename + ", to postgres DB")
149               
150                numSlash = len(filename.split('/'))
151                               
152                shortFilename = filename.split('/')[numSlash - 1]
153               
154                self.recordAttemptToIngestDiscoveryID = ""
155               
156                self.ingestProblemMessage = ""
157               
158                #need to generate a local version of the iso data model (first call was to run through all available files
159                #and convert & place into relevant dirs as necessary
160                #generate the ISO object from the converted ISO here now..
161               
162                try:
163                                                                                                                       
164                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
165                       
166                                                       
167                        if self.isoDataModel.createISOdataStructure() is True:
168                                logging.info("ISO extractor worked fine!")
169                       
170                       
171                        elif self.isoDataModel.createISOdataStructure() is True:
172                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
173                                                #sys.exit() # TODO fix these
174                                return
175                        else:
176                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
177                                                #sys.exit()
178                                return
179                except:
180                        logging.error("Could not generate ISO XML!")
181                        return
182               
183                #get the converted ISO into a local var
184                try:
185                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
186                        self.isoXML = file(filename).read()
187                        self.originalXMLdoc = self.isoXML
188                                       
189                except:
190                        logging.warn("Could not extract converted ISO xml to local variable!")
191                        self.isoXML = ''
192                               
193               
194                # first of all create a PostgresRecord - this object represents all the data required
195                # for a DB entry
196                dao = None
197               
198                try:
199                       
200                        #if record is valid (vital elements checked in ExtractISO - validDoc is True if everything ok..
201                       
202                        #record whats bout to be ingested - if a problem this, error should already have been kicked out                                       
203                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[0][0]
204                       
205                        if self.isoDataModel.validDoc is True:                 
206                               
207                               
208                                #record whats attempting to be ingested
209                                record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
210                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
211                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, self._currentMedinStandard, stubIso = self.isoXML, difXML = self.difXML, isoXML = self.isoXML)                   
212                               
213                                # Now create the data access object to interface to the DB                     
214                                dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc )
215                       
216                                # Finally, write the new record
217                                # 0 if failed, 1 if update, 2 if create
218                                returnCode = dao.createOrUpdateRecord() 
219                                               
220                                if returnCode == 2:
221                                        self._no_files_ingested += 1
222                               
223                                elif returnCode == 1:
224                                        self._no_files_changed += 1
225                                       
226                        else:
227                                self.ingestProblemMessage= "Content validity problems for record: %s (%s)" %(self.recordAttemptToIngestDiscoveryID,self.isoDataModel.processingMsg)
228                               
229                                logging.warn(self.ingestProblemMessage)
230                               
231                                #now raise error so can record this
232                                raise ValueError,self.ingestProblemMessage
233                       
234                except:
235                       
236                       
237                        #if error encountered, add to failure lisr
238                        logging.error("Could not update: " + filename)                 
239                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
240                        self.updateFailList.append(originalRecordFilename)
241                       
242                        logging.error("Exception thrown - detail: ")
243                        errors = sys.exc_info()
244                        logging.error(errors)
245                        self._error_messages += "%s\n" %str(errors[1])
246                       
247                        if dao:
248                                logging.info("Removing record and its associated info from DB")
249                                logging.info("- to allow clean ingestion on rerun")
250                                try:
251                                        dao.deleteOriginalRecord()
252                                except:
253                                        logging.error("Problem encountered when removing record: ")
254                                        logging.error(sys.exc_info())
255                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
256
257                        self._no_problem_files += 1
258                       
259                        logging.info("Continue processing other files")
260                       
261                return self.recordAttemptToIngestDiscoveryID
262                       
263       
264        def getProcessingConfig(self,configFilePath):
265               
266                '''
267                Fed up with messing about with hardcoded directory paths. 
268                This method to get relevant values out of the oai_document_ingester.config file
269               
270                Returns a dictionary of values with keys:
271               
272                #directory in which the code resides
273                code_directory /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/
274
275                #base directory in which metadata is extracted to and converted to
276                base_directory /home/badc/discovery_docs/ingestDocs/
277
278                #directory in which to write reports to
279                reporting_directory /home/badc/discovery_docs/ingestDocs/data/
280
281                #path to the passwords file
282                passwords_file /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/passwords.txt
283
284                #datacentre config file directory path
285                datcentre_configs /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/
286                '''
287               
288                # Check this file exists; if not, assume an invalid datacentre has been specified
289               
290                if not os.path.isfile(configFilePath):
291                    sys.exit("ERROR: Could not find the processing config file")
292                   
293                processingConfig = {}
294                   
295                processing_config_file = open(configFilePath, "r")
296               
297                for line in processing_config_file.readlines():
298                        words  = string.split(line)
299                        if len(words) == 0:
300                                continue
301                        elif words[0] == 'code_directory':
302                                processingConfig['code_directory'] = words[1]
303                                                                                               
304                        elif words[0] == 'base_directory':
305                                processingConfig['base_directory'] = words[1]
306                               
307                        elif words[0] == 'reporting_directory':
308                                processingConfig['reporting_directory'] = words[1]
309                               
310                        elif words[0] == 'passwords_file':
311                                processingConfig['passwords_file'] = words[1]
312                               
313                        elif words[0] == 'datcentre_configs':
314                                processingConfig['datcentre_configs'] = words[1]
315                                       
316                        elif words[0] == 'changeURLs':
317                                processingConfig['changeURLs'] = words[1]
318                               
319                        elif words[0] == 'NDG_redirect_URL':
320                                processingConfig['NDG_redirect_URL'] = words[1]
321                               
322                        elif words[0] == 'ingestLoggingConfig':
323                                processingConfig['ingestLoggingConfig'] = words[1]
324                               
325                        elif words[0] == 'ingestConfig':
326                                if not os.path.isfile(words[1]):
327                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
328                                else:
329                                        processingConfig['ingestConfig'] = words[1]
330                               
331                        elif words[0] == 'acceptedFormats':
332                                processingConfig['acceptedFormats'] = words[1]
333                               
334                        elif words[0] == 'saxonJarFile':
335                                if not os.path.isfile(words[1]):
336                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
337                                else:
338                                        processingConfig['saxonJarFile'] = words[1]
339                        elif words[0] == 'xpathIsoClass':
340                               
341                                #build a dictionary with format as the key                             
342                                isoClass = words[1].split(":")                         
343                               
344                                if 'xpathIsoClass' not in processingConfig.keys():
345                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
346                                else:
347                                        temp = processingConfig['xpathIsoClass']                                       
348                                        temp.update({isoClass[0]:isoClass[1]})
349                                        processingConfig['xpathIsoClass'] = temp
350                               
351                        elif words[0] == 'xqueryStubIsoGen':
352                               
353                                #build a dictionary with format as the key
354                                isoClass = words[1].split(":")
355                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
356                               
357                        elif words[0] == 'xqueryConversions':
358                               
359                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
360                                processingConfig['xqueryConversions'] = words[1].split(",")
361                               
362                        elif words[0] == 'currentMEDIN':
363                               
364                                #define the current MEDIN standard so can cast conversions as this (at some stage we will ingest a few different versions)
365                                processingConfig['currentMEDIN'] = words[1]
366                               
367                        elif words[0] == 'exceptXqConv':
368                               
369                                #build a dictionary with format as the key
370                                xqClass = words[1].split(":")
371                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
372                               
373                        elif words[0] == 'xqDocTypes':
374                               
375                                #build a dictionary with format as the key
376                                xqClass = words[1].split(":")
377                               
378                                if 'xqDocTypes' not in processingConfig.keys():
379                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
380                                else:
381                                        temp = processingConfig['xqDocTypes']                                   
382                                        temp.update({xqClass[0]:xqClass[1]})
383                                        processingConfig['xqDocTypes'] = temp
384                                       
385                processing_config_file.close()
386                       
387                return processingConfig
388       
389       
390        def getConfigDetails(self, datacentre):
391                '''
392                Get the harvested records directory and groups for this datacentre from the
393                datacentre specific config file.  The harvested records directory depends on the
394                datacentres OAI base url, the set and format. These have to be know up-front.
395                The groups denote which 'portal groups' they belong to - for limiting searches to
396                say NERC-only datacentres records.
397                Groups are added to the intermediate MOLES when it is created.
398                @param datacentre: datacentre to use when looking up config file
399                '''
400                # initialise the variables to retrieve from the config file
401                self._harvest_home = ""
402                self._datacentre_groups = ""
403                self._datacentre_format = ""
404                self._datacentre_namespace = ""
405                self._NDG_dataProvider = False
406                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
407               
408               
409                #note changed to production buildout path
410                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
411               
412                #fed up with this rubbish causing problems - use a normal file opener.
413                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
414               
415               
416                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
417                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
418               
419                # Check this file exists; if not, assume an invalid datacentre has been specified
420                if not os.path.isfile(self._datacentre_config_filename):
421                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
422                        "specified (%s) is invalid\n" %datacentre)
423                   
424               
425                datacentre_config_file = open(self._datacentre_config_filename, "r")
426               
427                for line in datacentre_config_file.readlines():
428                        words  = string.split(line)
429                        if len(words) == 0:
430                                continue
431                        elif words[0] == 'host_path':
432                                self._harvest_home = words[1].rstrip()
433                        elif words[0] == 'groups':
434                                self._datacentre_groups = words[1:]
435                        elif words[0] == 'format':
436                                self._datacentre_format = words[1]
437                        elif words[0] == 'namespace':
438                                self._datacentre_namespace = words[1]
439                        elif words[0] == 'NDG_dataProvider':
440                                self._NDG_dataProvider = True
441                        elif words[0] == 'feed':
442                                self._IngestViaFeed = True
443                       
444                datacentre_config_file.close()
445               
446                #stop ingest on this datacentre if process thread is OAI and the config says FEED
447                if ((self.processThread == 'OAI') and self._IngestViaFeed):
448                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
449                                               
450                if self._harvest_home == "":
451                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
452               
453                logging.info("harvested records are in " + self._harvest_home)
454               
455                if self._datacentre_groups == "":
456                    logging.info("No groups/keywords set for datacentre " + datacentre)
457                else:
458                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
459               
460                if self._datacentre_format == "":
461                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
462               
463                logging.info("format being harvested: " + self._datacentre_format)
464               
465                if self._datacentre_namespace == "":
466                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
467               
468                logging.info("datacentre namespace: " + self._datacentre_namespace)
469               
470                if self._NDG_dataProvider:
471                        logging.info("Datacentre classified as an NDG data provider")
472                else:
473                        logging.info("Datacentre is not classified as an NDG data provider")
474                logging.info(self.lineSeparator)
475               
476               
477        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
478                '''
479                Processes/renames the files (changed 08/01/07 to get id from inside file)
480                 - also replace any namespace declarations with a standard one which we know works in NDG
481                 NB, this copies files from the original dir to the discovery dir
482                 @param originals_dir: directory to convert files from
483                 @param discovery_dir: directory in which converted files will end up
484                 @return numfilesproc: counter of number of files processed
485                '''
486                numfilesproc = 0
487                badXMLfiles = 0         
488               
489                self.inputFileOrigFinal = {}
490                self.badXMLfileList = {}               
491                self.difXML = None
492               
493                logging.info(self.lineSeparator)
494                logging.info("Renaming files:")
495               
496                if self.ingestProcessID is not None and self.procID is None:
497                       
498                        self._getPostgresDBConnectionLogging()
499                                               
500                        #if process id has been supplied, then update the logging db with a "start_ingest"
501                        sqlStatusCmd = "select update_ingest_status (%s, 'run_ingest');" %self.ingestProcessID
502                        self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
503               
504                for filename in os.listdir(originals_dir):
505                       
506                        if not filename.endswith('.xml'):
507                                logging.warning('File %s is not xml format. Not processed'  %(filename))
508                                continue
509                       
510                        original_filename = originals_dir + filename
511                       
512                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
513                        if format == 'DIF_9.4':
514                                                       
515                                logging.info("Converting DIF to stub ISO...")                                   
516                               
517                                #if DIF still need to convert URL's using the old method so when requesting DIF in original format we still have correct redirect urls....
518                                #redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self._ndgRedirectURL)
519                                                                                                               
520                                #gets the metadata id from the xml
521                                metadataID=self.getID(original_filename)
522                                #import pdb
523                                #pdb.set_trace()
524                                                               
525                                repositoryName = self._datacentre_namespace
526                               
527                                #get the name of the file to be used in the xquery
528                                metadataFilename = filename.replace('.xml', '')
529                                                               
530                                #where is the file to be ingested located?
531                                metadataFileLoc = originals_dir
532                               
533                                #generate a new stubISO filename
534                                self.isoFormat= "stubISO"
535                                                               
536                               
537                                #self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename)
538                                self.xqueryTransformation = xqueryTransformation(metadataFileLoc,repositoryName,metadataID,metadataFilename,self._saxonJarFile)
539                                                                                                                       
540                                #get hold of the xml for the transformed ISO                           
541                                self.isoXML = self.xqueryTransformation.runXquery(self._dif2isoXquery)
542                                                               
543                                #create the iso file on the system
544                                tempStubISOfile = original_filename + ".stubISO.xml"
545                                FileUtilities.createFile(tempStubISOfile , self.isoXML)
546                               
547                                #generate the ISO object from the converted ISO here now..
548                                try:
549                                       
550                                        #temp assignment so failure mode supported
551                                        new_filename_short = original_filename
552                                                                       
553                                        isoDataModel = ExtractISO(tempStubISOfile,self._isoClass[self._datacentre_format])
554                               
555                                        if isoDataModel.createISOdataStructure() is True:
556                                                logging.info("ISO extractor worked fine!")
557                                               
558                                                #need to get hold original format XML too (so can put directly into "transformed" docs section...
559                                                self.originalXMLdoc = ''
560                               
561                                                try:
562                                                        logging.info("Extracting original input file into variable for input into original format in transformed docs table")
563                                                        self.originalXMLdoc = file(original_filename).read()
564                                                        self.difXML = self.originalXMLdoc
565                                       
566                                                except:
567                                                        logging.warn("Could not extract original file to local variable!")
568                                                       
569                                               
570                                                #Back to main stream of ISO ingest
571                                                if self._NDG_dataProvider:
572                                       
573                                                        #to ensure "stubISO" doesnt make it into the filename on system and in the db, use the self.
574                                                               
575                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
576                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + format + "__"+ metadataFilename.replace(":", "-")+".xml"
577                                                        new_filename_short = self._datacentre_namespace + "__" + format + "__" + metadataFilename.replace(":", "-")+".xml"
578                               
579                                       
580                                                else:
581                                                        ident = isoDataModel.datasetID[0][0]
582                                       
583                                                        ident = ident.replace(":", "-")
584                                                        ident = ident.replace("/", "-")
585                               
586                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
587                                                        new_filename_short = self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
588                               
589                                       
590                               
591                                                #now create this stub ISO file on system so can access it
592                                                self.genIsoFile = discovery_dir + new_filename_short   
593                                                               
594                                                #FileUtilities.createFile(self.stubIsoFile, self.isoXML)
595                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
596                                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
597                                               
598                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
599                                                #this links a derived filename in processing dir with original filename
600                                                #get basic filename from the path+filename passed to this function to use as key               
601                                                self.inputFileOrigFinal[new_filename_short]=filename
602                                               
603                                                numfilesproc += 1
604                       
605                       
606                                        elif isoDataModel.createISOdataStructure() is True:
607                                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
608                                                #sys.exit() # TODO fix these
609                                               
610                                        else:
611                                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
612                                                badXMLfiles += 1
613                                                self.badXMLfileList[new_filename_short]=filename
614                                                                       
615                                except:
616                                        logging.error("Could not generate ISO XML from DIF format!!")
617                                        badXMLfiles += 1
618                                        self.badXMLfileList[new_filename_short]=filename
619                               
620                       
621                        #all other ISO formats/profiles to deal with.
622                        else:                           
623                                isoFormat = format
624                               
625                                self.originalXMLdoc = "null"
626                               
627                               
628                                #generate the ISO object from the ISO xml..                                                                             
629                                isoDataModel = ExtractISO(original_filename,self._isoClass[self._datacentre_format])
630                               
631                                if isoDataModel.createISOdataStructure() is True:
632                                                logging.info("ISO extractor worked fine!")
633                                               
634                                                #Back to main stream of ISO ingest
635                                                if self._NDG_dataProvider:
636                                                               
637                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
638                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
639                                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
640                               
641                                                else:
642                                       
643                                                        ident = isoDataModel.datasetID[0][0]
644                               
645                                                        ident = ident.replace(":", "-")
646                                                        ident = ident.replace("/", "-")
647                               
648                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
649                                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
650                               
651                                                #now create this stub ISO file on system so can access it
652                                                self.genIsoFile = discovery_dir + new_filename_short   
653                               
654                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
655                                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
656                               
657                                                #get the converted ISO into a local var
658                                                try:
659                                                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
660                                                        self.isoXML = file(self.genIsoFile).read()
661                                                        self.originalXMLdoc = self.isoXML
662                                       
663                                                except:
664                                                        logging.warn("Could not extract converted ISO xml to local variable!")
665                                                        self.isoXML = ''
666                                                       
667                                                logging.info("original file = " + original_filename)
668                                                logging.info("newfile = " + new_filename)
669                               
670                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
671                                                #this links a derived filename in processing dir with original filename
672                                                #get basic filename from the path+filename passed to this function to use as key               
673                                                self.inputFileOrigFinal[new_filename_short]=filename
674                                               
675                                                numfilesproc += 1
676                                                                                               
677                                elif isoDataModel.createISOdataStructure() is True:
678                                        logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
679                                                #sys.exit() # TODO fix these
680                                       
681                                else:                                   
682                                        logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")
683                                        badXMLfiles += 1
684                                        self.badXMLfileList[new_filename_short]=filename
685                       
686                logging.info("File renaming and converting completed")
687                logging.info(self.lineSeparator)
688               
689                       
690                return numfilesproc,badXMLfiles
691       
692       
693        def _getPostgresDBConnection(self):
694                '''
695                Get the default postgres DB connection - by reading in data from the db config file
696                '''
697               
698                logging.debug("Setting up connection to postgres DB")
699                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
700               
701               
702                self.pgc = pgc(configFile = self._databaseConfigurationFile)
703                logging.info("Postgres DB connection now set up")
704               
705               
706       
707        def _getPostgresDBConnectionLogging(self):
708                '''
709                Get the default postgres DB connection - by reading in data from the db config file
710                '''
711               
712                logging.debug("Setting up connection to postgres DB")
713                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
714               
715               
716                self.pgc_IngestLog = pgc(configFile = self.ingestLoggingDatabaseConfigurationFile)
717                logging.info("Postgres DB connection now set up")
718
719
720
721        def     _backupAndCleanData(self):
722                '''
723                Back up ingested data for specified data centre, then clean any of
724                the ingest dirs
725                '''
726                logging.info("Backing up ingested data")
727                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
728                  strftime("%y%m%d_%H%M") + "_originals/"
729               
730                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
731                logging.info("Data backed up - now clearing ingest directories")
732                #Clear out the original harvest records area and discovery dir
733                #FileUtilities.cleanDir(self.originals_dir)
734                #FileUtilities.cleanDir(self.discovery_dir)
735                logging.info("Ingest directories cleared")
736
737
738        def     _setupDataCentreDirs(self):
739                '''
740                Set up directories appropriate for the current data centre
741                '''
742                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
743               
744               
745                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
746               
747                # the following dirs define where the specific documents should go
748                self.originals_dir = data_dir + "/oai/originals/" # copy of original
749                self.discovery_dir = data_dir + "/discovery/"
750                #self.stubISO_dir = data_dir + "/stub_iso/"
751               
752                # Create/clear the 'in' and 'out' directories
753                FileUtilities.setUpDir(self.originals_dir)
754                FileUtilities.setUpDir(self.discovery_dir)
755               
756                logging.info("Ingest directories for data centre set up")
757
758
759        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
760                '''
761                Convert files from originals dir to discovery one, then
762                ingest, backup and clean
763                @param originals_dir: directory to ingest docs from
764                @param discovery_dir: directory to use to process ingested docs
765                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
766                '''
767               
768                self.thisIngestStartDate = str(datetime.datetime.now())
769               
770                numfilesproc,badXmlFiles = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
771               
772                filenames = os.listdir(self.discovery_dir)
773               
774                ingestProbMsg = ""
775                               
776                #generate a list of files ALREADY in database so can compare with what has been ingested
777                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
778                               
779                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
780                filesPresentList=[]
781       
782               
783                if filePresentListArr:
784                        for fileArr in filePresentListArr:
785                                filesPresentList.append(fileArr[0])
786                               
787                                #TODO - is above relevant - duplicating functionlaity?
788       
789                #create list to to hold files ingest failed on.
790                self.updateFailList = []
791                self.deletedFailList = []
792                self.problemMessageList = {}   
793               
794                counter = 1
795                               
796                for filename in filenames:
797                       
798                        fullPath = self.discovery_dir + filename
799                       
800                        #if process id has been supplied, then update the logging db with a "start_ingest"
801                        #sqlStatusCmd = "select update_ingest_status (%s, 'run_ingest %s/%s');" %(self.ingestProcessID,counter,numfilesproc)
802                        #sqlStatusCmd = "select update_ingest_status (%s, 'run_ingest');" %self.ingestProcessID
803                        #self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
804                       
805                        if os.path.isfile(fullPath):
806                                       
807                                thisIngestedID = self.addFileToPostgresDB(fullPath)
808                               
809                                #record all problem messages
810                                #ingestProbMsg = ingestProbMsg + self.ingestProblemMessage + "\n"
811                               
812                                if self.ingestProblemMessage != '':
813                                        ingestProbMsg = ingestProbMsg + self.ingestProblemMessage
814                                       
815                                        self.problemMessageList[filename] = self.ingestProblemMessage
816                                       
817                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
818                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
819                                        if thisIngestedID in filesPresentList: 
820                                                filesPresentList.remove(thisIngestedID)
821                       
822                        counter += 1   
823                               
824                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
825                #will need to be removed.
826               
827                #only do this if not in single file mode (or else wverything else gets deleted!)               
828                if ((self.indFileToIngest == "") & (feed)):
829                        for item in filesPresentList:
830                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
831                                DeleteRecord(item)
832                                self.deletedFailList.append(item)
833                                self._no_files_deleted += 1
834                       
835                       
836                self._backupAndCleanData()
837               
838                #at this stage put the reporting code in (will work for oai or atom feed)
839                #create a summary file for each data centre ingest
840                data_dir = self._base_dir + "data/"
841                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
842                recOpFile = open(recOpFileName,'w')
843               
844                logging.info("oai_document_ingest processing complete:")
845               
846                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
847                self.thisIngestEndDate = str(datetime.datetime.now())
848               
849                message = "Ingest report for data centre: " + datacentre + "\n"
850                message = message + "Ingest date: " + self.thisIngestEndDate + "\n"
851                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
852                message = message + "TOTAL_NUMBER_OF_FILES " + str(numfilesproc+badXmlFiles) + "\n"
853                message = message + "TOTAL_PROCESSED_SUCCESFULLY " + str(numfilesproc) + "\n"
854                message = message + "TOTAL_PROCESSED_UNSUCCESFULLY " + str(badXmlFiles) + "\n" 
855                message = message + "INGESTED_Created " + str(self._no_files_ingested)  + "\n"
856                message = message + "INGESTED_Updated " + str(self._no_files_changed)  + "\n"
857                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"           
858                message = message + "PROBLEM_FILES " + str(self._no_problem_files)  + "\n"
859                                               
860                if len(self.updateFailList) > 0:
861                        message = message +  "\nBelow are comments recorded on problem files...\n\n"
862               
863                        for badFile in self.updateFailList:
864                                #recOpFile.write("PROBLEM_FILE " + badFile + "\n")     
865                                message = message +"\nPROBLEM_FILE " + badFile + "\n"
866               
867                if len(self.badXMLfileList.keys()) > 0:
868                        message = message +  "\nBelow are the files that could not be parsed (i.e. bad XML?)...\n\n"
869                        for badXmlFile in self.badXMLfileList.values():
870                                message = message + "BAD_XML_FILE " + badXmlFile +"\n"
871               
872                                               
873                #log processing results to ingestLogging table if required to           
874               
875                if self.ingestProcessID is not None:
876                        logging.info("Updating ingest logging database")
877                                               
878                        sqlLoggingCmd = "select add_ingest_stats(%s,'%s','%s','%s','%s','%s','%s','%s','%s','%s');"%(self.ingestProcessID,self.thisIngestStartDate,self.thisIngestEndDate,numfilesproc+badXmlFiles,numfilesproc,badXmlFiles,self._no_files_ingested,self._no_files_changed,self._no_files_deleted,self._no_problem_files)
879                                       
880                        #update ingestLogging                   
881                        self.pgc_IngestLog.runSQLCommand(sqlLoggingCmd)
882                       
883                        #now update problem file log if there are problem files
884                        if self._no_problem_files > 0:
885                                logging.info("There are %s problem files - recording these in stats logging db" %self._no_problem_files)
886                               
887                                #note sometimes message will not always be recorded, so just record name of file
888                                if len(self.problemMessageList.keys()) < 1:
889                                       
890                                        for badFile in self.updateFailList:
891                                                sqlProbFileCmd = "select add_problem_file(%s,'%s','%s');"%(self.ingestProcessID,badFile,'Could not catch precise error for this file!!')
892                               
893                                                #update ingestLogging                   
894                                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)
895                               
896                                else:
897                                        for badFile in self.problemMessageList.keys():                                 
898                                                sqlProbFileCmd = "select add_problem_file(%s,'%s','%s');"%(self.ingestProcessID,badFile,self.problemMessageList[badFile])
899                                                       
900                                                #update ingestLogging                   
901                                                self.pgc_IngestLog.runSQLCommand(sqlProbFileCmd)               
902                               
903                                       
904                        #if process id has been supplied, then update the logging db with a "start_ingest" (only for DPWS ops)
905                        if self.procID is None:
906                                sqlStatusCmd = "select update_ingest_status (%s, 'end_ingest');" %self.ingestProcessID
907                                self.pgc_IngestLog.runSQLCommand(sqlStatusCmd)
908                       
909                #update with content from the problem messages recorded
910                message = message + ingestProbMsg + "\n"               
911                         
912                recOpFile.write(message)
913                               
914                return numfilesproc,badXmlFiles, message
915
916
917
918        def _setupXQueries(self):
919                '''
920                Set up the required XQueries
921                - NB, extract the xquery libraries locally for easy reference
922                '''
923                self._xq = ndgResources()
924                for libFile in self._xq.xqlib:
925                        # NB, we don't want the full path to the files - just the filename
926                        fileName = libFile.split('/')[-1]
927                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.