source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 6891

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@6891
Revision 6891, 31.0 KB checked in by sdonegan, 9 years ago (diff)

further bug fixes and reporting updates and update to run_all_ingest for medin style db

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24
25class AbstractDocumentIngester(object):
26        '''
27        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
28        - including running the various transforms and parsings to get all doc types and spatiotemporal
29        data in the correct form in the DB
30        '''
31        lineSeparator = "------------------------------"
32               
33        # The directory to put things for a tape backup (should already exist)
34        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
35       
36        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
37               
38        def _setupCmdLineOptions(self):
39                '''
40                Determine the logging level to use and configure this appropriately
41                @return args: any input arguments - excluding options
42                '''
43               
44                # check for verbose option
45                try:
46                        opts, args = getopt.getopt(sys.argv[1:], "vd")
47                except getopt.GetoptError, err:
48                    # print help information and exit:
49                    print str(err) # will print something like "option -a not recognized"
50                    sys.exit(2)
51                   
52                if len(args) < 1:
53                        self.usage()
54                   
55                loggingLevel = logging.WARNING
56                for o, a in opts:
57                    if o == "-v":
58                        print " - Verbose mode ON"
59                        loggingLevel = logging.INFO
60                    elif o == "-d":
61                        print " - Debug mode ON"
62                        loggingLevel = logging.DEBUG
63                   
64               
65                # set up any keywords on the object
66                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
67                checkConfInArgs = False
68                for arg in args:
69                       
70                        keywordArgs=['ingestFromDate','interval','individualFile','interval','ingestFromDate','eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile']                                             
71                       
72                        bits = arg.split('=')
73                        if len(bits) == 2:
74                                if bits[0] == keywordArgs[0]:
75                                        self.setIngestFromDate(bits[1])
76                                elif bits[0] == keywordArgs[1]:
77                                        self.setPollInterval(bits[1])
78                                elif bits[0] == keywordArgs[8]:
79                                        print " - Running with specified config file!"
80                                        self.setOaiConfigFile(bits[1])
81                                        checkConfInArgs = True
82                                elif bits[0] == keywordArgs[2]:
83                                        print " - Running in single file ingestion mode!"
84                                        self.setIndFileToIngest(bits[1])
85                                elif bits[0] not in keywordArgs:
86                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
87                                        sys.exit(2)
88                                else:
89                                        setattr(self, bits[0], bits[1])
90                       
91                if checkConfInArgs is False:
92                        print "\nWARNING: You must now include the ingestConfigFile arguement for successful operation"
93                        sys.exit(2)
94               
95                print self.lineSeparator
96               
97                # NB, this is a slight fudge as cannot get the detailed logging to work
98                # without setting up a new logger here - which means we get two loggers
99                # outputing data. The initial call to logging needs to be tracked down
100                # and configured correctly, so this can be avoided...
101#               self.logger = logging.getLogger()
102#               self.logger.setLevel(loggingLevel)
103
104                # create console handler and set level to debug
105#               ch = logging.StreamHandler()
106#               ch.setLevel(loggingLevel)
107                # create formatter
108#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
109                # add formatter to ch
110#               ch.setFormatter(formatter)
111                # add ch to logger
112#               self.logger.addHandler(ch)
113               
114                return args
115
116
117        def getID(self, filename):
118                '''
119                Gets the identifier out of an input metadata xml record.
120                @param filename - name of document file being processed
121                @return: ID - id to use to refer to the document
122                '''
123                logging.info("Retrieving identifier for metadata record " + filename)
124       
125                xml=file(filename).read()
126               
127                ID = idget(xml)
128               
129                return ID
130       
131       
132        def addFileToPostgresDB(self, filename):
133                '''
134                Add a file to the postgres DB - extracting and storing all the required
135                data in the process
136                @param filename: full path of file to add to postgres DB
137                '''
138                logging.info("Adding file, " + filename + ", to postgres DB")
139               
140                numSlash = len(filename.split('/'))
141                               
142                shortFilename = filename.split('/')[numSlash - 1]
143               
144                self.recordAttemptToIngestDiscoveryID = ""
145               
146                self.ingestProblemMessage = ""
147               
148                #need to generate a local version of the iso data model (first call was to run through all available files
149                #and convert & place into relevant dirs as necessary
150                #generate the ISO object from the converted ISO here now..
151               
152                try:
153                                                                                                                       
154                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
155                       
156                                                       
157                        if self.isoDataModel.createISOdataStructure() is True:
158                                logging.info("ISO extractor worked fine!")
159                       
160                       
161                        elif self.isoDataModel.createISOdataStructure() is True:
162                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
163                                                #sys.exit() # TODO fix these
164                                return
165                        else:
166                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
167                                                #sys.exit()
168                                return
169                except:
170                        logging.error("Could not generate ISO XML!")
171                        return
172               
173                #get the converted ISO into a local var
174                try:
175                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
176                        self.isoXML = file(filename).read()
177                        self.originalXMLdoc = self.isoXML
178                                       
179                except:
180                        logging.warn("Could not extract converted ISO xml to local variable!")
181                        self.isoXML = ''
182                               
183               
184                # first of all create a PostgresRecord - this object represents all the data required
185                # for a DB entry
186                dao = None
187               
188                try:
189                       
190                        #if record is valid (vital elements checked in ExtractISO - validDoc is True if everything ok..
191                       
192                        #record whats bout to be ingested - if a problem this, error should already have been kicked out                                       
193                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[0][0]
194                       
195                        if self.isoDataModel.validDoc is True:                 
196                                                                               
197                               
198                                #record whats attempting to be ingested
199                                record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
200                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
201                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, self._currentMedinStandard, stubIso = self.isoXML)                       
202                               
203                                # Now create the data access object to interface to the DB                     
204                                dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc )
205                       
206                                # Finally, write the new record
207                                # 0 if failed, 1 if update, 2 if create
208                                returnCode = dao.createOrUpdateRecord() 
209                                               
210                                if returnCode == 2:
211                                        self._no_files_ingested += 1
212                               
213                                elif returnCode == 1:
214                                        self._no_files_changed += 1
215                                       
216                        else:
217                                self.ingestProblemMessage= "Content validity problems for record: %s (%s)" %(self.recordAttemptToIngestDiscoveryID,self.isoDataModel.processingMsg)
218                               
219                                logging.warn(self.ingestProblemMessage)
220                               
221                                #now raise error so can record this
222                                raise ValueError,ingestMessage
223                       
224                except:
225                       
226                       
227                        #if error encountered, add to failure lisr
228                        logging.error("Could not update: " + filename)                 
229                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
230                        self.updateFailList.append(originalRecordFilename)
231                       
232                        logging.error("Exception thrown - detail: ")
233                        errors = sys.exc_info()
234                        logging.error(errors)
235                        self._error_messages += "%s\n" %str(errors[1])
236                       
237                        if dao:
238                                logging.info("Removing record and its associated info from DB")
239                                logging.info("- to allow clean ingestion on rerun")
240                                try:
241                                        dao.deleteOriginalRecord()
242                                except:
243                                        logging.error("Problem encountered when removing record: ")
244                                        logging.error(sys.exc_info())
245                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
246
247                        self._no_problem_files += 1
248                       
249                        logging.info("Continue processing other files")
250                       
251                return self.recordAttemptToIngestDiscoveryID
252                       
253       
254        def getProcessingConfig(self,configFilePath):
255               
256                '''
257                Fed up with messing about with hardcoded directory paths. 
258                This method to get relevant values out of the oai_document_ingester.config file
259               
260                Returns a dictionary of values with keys:
261               
262                #directory in which the code resides
263                code_directory /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/
264
265                #base directory in which metadata is extracted to and converted to
266                base_directory /home/badc/discovery_docs/ingestDocs/
267
268                #directory in which to write reports to
269                reporting_directory /home/badc/discovery_docs/ingestDocs/data/
270
271                #path to the passwords file
272                passwords_file /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/passwords.txt
273
274                #datacentre config file directory path
275                datcentre_configs /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/
276                '''
277               
278                # Check this file exists; if not, assume an invalid datacentre has been specified
279               
280                if not os.path.isfile(configFilePath):
281                    sys.exit("ERROR: Could not find the processing config file")
282                   
283                processingConfig = {}
284                   
285                processing_config_file = open(configFilePath, "r")
286               
287                for line in processing_config_file.readlines():
288                        words  = string.split(line)
289                        if len(words) == 0:
290                                continue
291                        elif words[0] == 'code_directory':
292                                processingConfig['code_directory'] = words[1]
293                                                                                               
294                        elif words[0] == 'base_directory':
295                                processingConfig['base_directory'] = words[1]
296                               
297                        elif words[0] == 'reporting_directory':
298                                processingConfig['reporting_directory'] = words[1]
299                               
300                        elif words[0] == 'passwords_file':
301                                processingConfig['passwords_file'] = words[1]
302                               
303                        elif words[0] == 'datcentre_configs':
304                                processingConfig['datcentre_configs'] = words[1]
305                                       
306                        elif words[0] == 'changeURLs':
307                                processingConfig['changeURLs'] = words[1]
308                               
309                        elif words[0] == 'NDG_redirect_URL':
310                                processingConfig['NDG_redirect_URL'] = words[1]
311                               
312                        elif words[0] == 'ingestConfig':
313                                if not os.path.isfile(words[1]):
314                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
315                                else:
316                                        processingConfig['ingestConfig'] = words[1]
317                               
318                        elif words[0] == 'acceptedFormats':
319                                processingConfig['acceptedFormats'] = words[1]
320                               
321                        elif words[0] == 'saxonJarFile':
322                                if not os.path.isfile(words[1]):
323                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
324                                else:
325                                        processingConfig['saxonJarFile'] = words[1]
326                        elif words[0] == 'xpathIsoClass':
327                               
328                                #build a dictionary with format as the key                             
329                                isoClass = words[1].split(":")                         
330                               
331                                if 'xpathIsoClass' not in processingConfig.keys():
332                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
333                                else:
334                                        temp = processingConfig['xpathIsoClass']                                       
335                                        temp.update({isoClass[0]:isoClass[1]})
336                                        processingConfig['xpathIsoClass'] = temp
337                               
338                        elif words[0] == 'xqueryStubIsoGen':
339                               
340                                #build a dictionary with format as the key
341                                isoClass = words[1].split(":")
342                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
343                               
344                        elif words[0] == 'xqueryConversions':
345                               
346                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
347                                processingConfig['xqueryConversions'] = words[1].split(",")
348                               
349                        elif words[0] == 'currentMEDIN':
350                               
351                                #define the current MEDIN standard so can cast conversions as this (at some stage we will ingest a few different versions)
352                                processingConfig['currentMEDIN'] = words[1]
353                               
354                        elif words[0] == 'exceptXqConv':
355                               
356                                #build a dictionary with format as the key
357                                xqClass = words[1].split(":")
358                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
359                               
360                        elif words[0] == 'xqDocTypes':
361                               
362                                #build a dictionary with format as the key
363                                xqClass = words[1].split(":")
364                               
365                                if 'xqDocTypes' not in processingConfig.keys():
366                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
367                                else:
368                                        temp = processingConfig['xqDocTypes']                                   
369                                        temp.update({xqClass[0]:xqClass[1]})
370                                        processingConfig['xqDocTypes'] = temp
371                                       
372                processing_config_file.close()
373                       
374                return processingConfig
375       
376       
377        def getConfigDetails(self, datacentre):
378                '''
379                Get the harvested records directory and groups for this datacentre from the
380                datacentre specific config file.  The harvested records directory depends on the
381                datacentres OAI base url, the set and format. These have to be know up-front.
382                The groups denote which 'portal groups' they belong to - for limiting searches to
383                say NERC-only datacentres records.
384                Groups are added to the intermediate MOLES when it is created.
385                @param datacentre: datacentre to use when looking up config file
386                '''
387                # initialise the variables to retrieve from the config file
388                self._harvest_home = ""
389                self._datacentre_groups = ""
390                self._datacentre_format = ""
391                self._datacentre_namespace = ""
392                self._NDG_dataProvider = False
393                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
394               
395               
396                #note changed to production buildout path
397                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
398               
399                #fed up with this rubbish causing problems - use a normal file opener.
400                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
401               
402               
403                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
404                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
405               
406                # Check this file exists; if not, assume an invalid datacentre has been specified
407                if not os.path.isfile(self._datacentre_config_filename):
408                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
409                        "specified (%s) is invalid\n" %datacentre)
410                   
411               
412                datacentre_config_file = open(self._datacentre_config_filename, "r")
413               
414                for line in datacentre_config_file.readlines():
415                        words  = string.split(line)
416                        if len(words) == 0:
417                                continue
418                        elif words[0] == 'host_path':
419                                self._harvest_home = words[1].rstrip()
420                        elif words[0] == 'groups':
421                                self._datacentre_groups = words[1:]
422                        elif words[0] == 'format':
423                                self._datacentre_format = words[1]
424                        elif words[0] == 'namespace':
425                                self._datacentre_namespace = words[1]
426                        elif words[0] == 'NDG_dataProvider':
427                                self._NDG_dataProvider = True
428                        elif words[0] == 'feed':
429                                self._IngestViaFeed = True
430                       
431                datacentre_config_file.close()
432               
433                #stop ingest on this datacentre if process thread is OAI and the config says FEED
434                if ((self.processThread == 'OAI') and self._IngestViaFeed):
435                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
436                                               
437                if self._harvest_home == "":
438                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
439               
440                logging.info("harvested records are in " + self._harvest_home)
441               
442                if self._datacentre_groups == "":
443                    logging.info("No groups/keywords set for datacentre " + datacentre)
444                else:
445                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
446               
447                if self._datacentre_format == "":
448                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
449               
450                logging.info("format being harvested: " + self._datacentre_format)
451               
452                if self._datacentre_namespace == "":
453                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
454               
455                logging.info("datacentre namespace: " + self._datacentre_namespace)
456               
457                if self._NDG_dataProvider:
458                        logging.info("Datacentre classified as an NDG data provider")
459                else:
460                        logging.info("Datacentre is not classified as an NDG data provider")
461                logging.info(self.lineSeparator)
462               
463               
464        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
465                '''
466                Processes/renames the files (changed 08/01/07 to get id from inside file)
467                 - also replace any namespace declarations with a standard one which we know works in NDG
468                 NB, this copies files from the original dir to the discovery dir
469                 @param originals_dir: directory to convert files from
470                 @param discovery_dir: directory in which converted files will end up
471                 @return numfilesproc: counter of number of files processed
472                '''
473                numfilesproc = 0
474                badXMLfiles = 0         
475               
476                self.inputFileOrigFinal = {}
477                self.badXMLfileList = {}
478               
479                logging.info(self.lineSeparator)
480                logging.info("Renaming files:")
481               
482                for filename in os.listdir(originals_dir):
483                       
484                        if not filename.endswith('.xml'):
485                                logging.warning('File %s is not xml format. Not processed'  %(filename))
486                                continue
487                       
488                        original_filename = originals_dir + filename
489                       
490                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
491                        if format == 'DIF_9.4':
492                                                       
493                                logging.info("Converting DIF to stub ISO...")                                   
494                               
495                                #if DIF still need to convert URL's using the old method so when requesting DIF in original format we still have correct redirect urls....
496                                #redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self._ndgRedirectURL)
497                                                                                                               
498                                #gets the metadata id from the xml
499                                metadataID=self.getID(original_filename)
500                                #import pdb
501                                #pdb.set_trace()
502                                                               
503                                repositoryName = self._datacentre_namespace
504                               
505                                #get the name of the file to be used in the xquery
506                                metadataFilename = filename.replace('.xml', '')
507                                                               
508                                #where is the file to be ingested located?
509                                metadataFileLoc = originals_dir
510                               
511                                #generate a new stubISO filename
512                                self.isoFormat= "stubISO"
513                                                               
514                               
515                                #self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename)
516                                self.xqueryTransformation = xqueryTransformation(metadataFileLoc,repositoryName,metadataID,metadataFilename,self._saxonJarFile)
517                                                                                                                       
518                                #get hold of the xml for the transformed ISO                           
519                                self.isoXML = self.xqueryTransformation.runXquery(self._dif2isoXquery)
520                                                               
521                                #create the iso file on the system
522                                tempStubISOfile = original_filename + ".stubISO.xml"
523                                FileUtilities.createFile(tempStubISOfile , self.isoXML)
524                               
525                                #generate the ISO object from the converted ISO here now..
526                                try:
527                                                                                               
528                                        isoDataModel = ExtractISO(tempStubISOfile,self._isoClass[self._datacentre_format])
529                               
530                                        if isoDataModel.createISOdataStructure() is True:
531                                                logging.info("ISO extractor worked fine!")
532                                               
533                                                #need to get hold original format XML too (so can put directly into "transformed" docs section...
534                                                self.originalXMLdoc = ''
535                               
536                                                try:
537                                                        logging.info("Extracting original input file into variable for input into original format in transformed docs table")
538                                                        self.originalXMLdoc = file(original_filename).read()
539                                       
540                                                except:
541                                                        logging.warn("Could not extract original file to local variable!")
542                               
543                               
544                                                #Back to main stream of ISO ingest
545                                                if self._NDG_dataProvider:
546                                       
547                                                        #to ensure "stubISO" doesnt make it into the filename on system and in the db, use the self.
548                                                               
549                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
550                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + format + "__"+ metadataFilename.replace(":", "-")+".xml"
551                                                        new_filename_short = self._datacentre_namespace + "__" + format + "__" + metadataFilename.replace(":", "-")+".xml"
552                               
553                                       
554                                                else:
555                                                        ident = isoDataModel.datasetID[0][0]
556                                       
557                                                        ident = ident.replace(":", "-")
558                                                        ident = ident.replace("/", "-")
559                               
560                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
561                                                        new_filename_short = self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
562                               
563                                       
564                               
565                                                #now create this stub ISO file on system so can access it
566                                                self.genIsoFile = discovery_dir + new_filename_short   
567                                                               
568                                                #FileUtilities.createFile(self.stubIsoFile, self.isoXML)
569                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
570                                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
571                                               
572                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
573                                                #this links a derived filename in processing dir with original filename
574                                                #get basic filename from the path+filename passed to this function to use as key               
575                                                self.inputFileOrigFinal[new_filename_short]=filename
576                                               
577                                                numfilesproc += 1
578                       
579                       
580                                        elif isoDataModel.createISOdataStructure() is True:
581                                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
582                                                #sys.exit() # TODO fix these
583                                               
584                                        else:
585                                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
586                                                badXMLfiles += 1
587                                                self.badXMLfileList[new_filename_short]=filename
588                                                                       
589                                except:
590                                        logging.error("Could not generate ISO XML from DIF format!!")
591                                        badXMLfiles += 1
592                                        self.badXMLfileList[new_filename_short]=filename
593                               
594                       
595                        #all other ISO formats/profiles to deal with.
596                        else:                           
597                                isoFormat = format
598                               
599                                self.originalXMLdoc = "null"
600                               
601                               
602                                #generate the ISO object from the ISO xml..                                                                             
603                                isoDataModel = ExtractISO(original_filename,self._isoClass[self._datacentre_format])
604                               
605                                if isoDataModel.createISOdataStructure() is True:
606                                                logging.info("ISO extractor worked fine!")
607                                               
608                                                #Back to main stream of ISO ingest
609                                                if self._NDG_dataProvider:
610                                                               
611                                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
612                                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
613                                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
614                               
615                                                else:
616                                       
617                                                        ident = isoDataModel.datasetID[0][0]
618                               
619                                                        ident = ident.replace(":", "-")
620                                                        ident = ident.replace("/", "-")
621                               
622                                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
623                                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
624                               
625                                                #now create this stub ISO file on system so can access it
626                                                self.genIsoFile = discovery_dir + new_filename_short   
627                               
628                                                #generate the redirect urls (if set to!) - now we have worked out the full discovery ndg name..
629                                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile,self.changeUrls)
630                               
631                                                #get the converted ISO into a local var
632                                                try:
633                                                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
634                                                        self.isoXML = file(self.genIsoFile).read()
635                                                        self.originalXMLdoc = self.isoXML
636                                       
637                                                except:
638                                                        logging.warn("Could not extract converted ISO xml to local variable!")
639                                                        self.isoXML = ''
640                                                       
641                                                logging.info("original file = " + original_filename)
642                                                logging.info("newfile = " + new_filename)
643                               
644                                                #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
645                                                #this links a derived filename in processing dir with original filename
646                                                #get basic filename from the path+filename passed to this function to use as key               
647                                                self.inputFileOrigFinal[new_filename_short]=filename
648                                               
649                                                numfilesproc += 1
650                                                                                               
651                                elif isoDataModel.createISOdataStructure() is True:
652                                        logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
653                                                #sys.exit() # TODO fix these
654                                       
655                                else:                                   
656                                        logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")
657                                        badXMLfiles += 1
658                                        self.badXMLfileList[new_filename_short]=filename
659                       
660                logging.info("File renaming and converting completed")
661                logging.info(self.lineSeparator)
662               
663                       
664                return numfilesproc,badXMLfiles
665       
666       
667        def _getPostgresDBConnection(self):
668                '''
669                Get the default postgres DB connection - by reading in data from the db config file
670                '''
671               
672                logging.debug("Setting up connection to postgres DB")
673                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
674               
675               
676                self.pgc = pgc(configFile = self._databaseConfigurationFile)
677                logging.info("Postgres DB connection now set up")
678
679
680
681        def     _backupAndCleanData(self):
682                '''
683                Back up ingested data for specified data centre, then clean any of
684                the ingest dirs
685                '''
686                logging.info("Backing up ingested data")
687                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
688                  strftime("%y%m%d_%H%M") + "_originals/"
689               
690                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
691                logging.info("Data backed up - now clearing ingest directories")
692                #Clear out the original harvest records area and discovery dir
693                #FileUtilities.cleanDir(self.originals_dir)
694                #FileUtilities.cleanDir(self.discovery_dir)
695                logging.info("Ingest directories cleared")
696
697
698        def     _setupDataCentreDirs(self):
699                '''
700                Set up directories appropriate for the current data centre
701                '''
702                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
703               
704               
705                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
706               
707                # the following dirs define where the specific documents should go
708                self.originals_dir = data_dir + "/oai/originals/" # copy of original
709                self.discovery_dir = data_dir + "/discovery/"
710                #self.stubISO_dir = data_dir + "/stub_iso/"
711               
712                # Create/clear the 'in' and 'out' directories
713                FileUtilities.setUpDir(self.originals_dir)
714                FileUtilities.setUpDir(self.discovery_dir)
715               
716                logging.info("Ingest directories for data centre set up")
717
718
719        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
720                '''
721                Convert files from originals dir to discovery one, then
722                ingest, backup and clean
723                @param originals_dir: directory to ingest docs from
724                @param discovery_dir: directory to use to process ingested docs
725                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
726                '''
727               
728               
729                numfilesproc,badXmlFiles = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
730               
731                filenames = os.listdir(self.discovery_dir)
732               
733                ingestProbMsg = ""
734                               
735                #generate a list of files ALREADY in database so can compare with what has been ingested
736                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
737                               
738                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
739                filesPresentList=[]
740       
741               
742                if filePresentListArr:
743                        for fileArr in filePresentListArr:
744                                filesPresentList.append(fileArr[0])
745                               
746                                #TODO - is above relevant - duplicating functionlaity?
747       
748                #create list to to hold files ingest failed on.
749                self.updateFailList = []
750                self.deletedFailList = []                               
751               
752                for filename in filenames:
753                       
754                        fullPath = self.discovery_dir + filename
755                       
756                        if os.path.isfile(fullPath):
757                                       
758                                thisIngestedID = self.addFileToPostgresDB(fullPath)
759                               
760                                #record all problem messages
761                                ingestProbMsg = ingestProbMsg + self.ingestProblemMessage + "\n"
762                               
763                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
764                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
765                                        if thisIngestedID in filesPresentList: 
766                                                filesPresentList.remove(thisIngestedID)                 
767                               
768                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
769                #will need to be removed.
770               
771                #only do this if not in single file mode (or else wverything else gets deleted!)               
772                if ((self.indFileToIngest == "") & (feed)):
773                        for item in filesPresentList:
774                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
775                                DeleteRecord(item)
776                                self.deletedFailList.append(item)
777                                self._no_files_deleted += 1
778                       
779                       
780                self._backupAndCleanData()
781               
782                #at this stage put the reporting code in (will work for oai or atom feed)
783                #create a summary file for each data centre ingest
784                data_dir = self._base_dir + "data/"
785                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
786                recOpFile = open(recOpFileName,'w')
787               
788                logging.info("oai_document_ingest processing complete:")
789               
790                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
791                message = "Ingest report for data centre: " + datacentre + "\n"
792                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
793                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
794                message = message + "TOTAL_NUMBER_OF_FILES " + str(numfilesproc+badXmlFiles) + "\n"
795                message = message + "TOTAL_PROCESSED_SUCCESFULLY " + str(numfilesproc) + "\n"
796                message = message + "TOTAL_PROCESSED_UNSUCCESFULLY " + str(badXmlFiles) + "\n" 
797                message = message + "INGESTED_Created " + str(self._no_files_ingested)  + "\n"
798                message = message + "INGESTED_Updated " + str(self._no_files_changed)  + "\n"
799                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"           
800                message = message + "PROBLEM_FILES " + str(self._no_problem_files)  + "\n"
801                                               
802                if len(self.updateFailList) > 0:
803                        message = message +  "\nBelow are comments recorded on problem files...\n\n"
804               
805                        for badFile in self.updateFailList:
806                                #recOpFile.write("PROBLEM_FILE " + badFile + "\n")     
807                                message = message +"\nPROBLEM_FILE " + badFile + "\n"
808               
809                if len(self.badXMLfileList.keys()) > 0:
810                        message = message +  "\nBelow are the files that coould not be parsed (i.e. bad XML?)...\n\n"
811                        for badXmlFile in self.badXMLfileList.values():
812                                message = message + "BAD_XML_FILE " + badXmlFile +"\n"
813               
814                       
815                       
816                #update with content from the problem messages recorded
817                message = message + ingestProbMsg + "\n"               
818                         
819                recOpFile.write(message)
820                               
821                return numfilesproc,badXmlFiles, message
822
823
824
825        def _setupXQueries(self):
826                '''
827                Set up the required XQueries
828                - NB, extract the xquery libraries locally for easy reference
829                '''
830                self._xq = ndgResources()
831                for libFile in self._xq.xqlib:
832                        # NB, we don't want the full path to the files - just the filename
833                        fileName = libFile.split('/')[-1]
834                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.