source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 6875

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@6875
Revision 6875, 30.0 KB checked in by sdonegan, 10 years ago (diff)

Fixed some content bugs and handling of keywords/parameters as well as improve reporting

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24
25class AbstractDocumentIngester(object):
26        '''
27        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
28        - including running the various transforms and parsings to get all doc types and spatiotemporal
29        data in the correct form in the DB
30        '''
31        lineSeparator = "------------------------------"
32               
33        # The directory to put things for a tape backup (should already exist)
34        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
35       
36        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
37               
38        def _setupCmdLineOptions(self):
39                '''
40                Determine the logging level to use and configure this appropriately
41                @return args: any input arguments - excluding options
42                '''
43               
44                # check for verbose option
45                try:
46                        opts, args = getopt.getopt(sys.argv[1:], "vd")
47                except getopt.GetoptError, err:
48                    # print help information and exit:
49                    print str(err) # will print something like "option -a not recognized"
50                    sys.exit(2)
51                   
52                if len(args) < 1:
53                        self.usage()
54                   
55                loggingLevel = logging.WARNING
56                for o, a in opts:
57                    if o == "-v":
58                        print " - Verbose mode ON"
59                        loggingLevel = logging.INFO
60                    elif o == "-d":
61                        print " - Debug mode ON"
62                        loggingLevel = logging.DEBUG
63                   
64               
65                # set up any keywords on the object
66                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
67                checkConfInArgs = False
68                for arg in args:
69                       
70                        keywordArgs=['ingestFromDate','interval','individualFile','interval','ingestFromDate','eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile']                                             
71                       
72                        bits = arg.split('=')
73                        if len(bits) == 2:
74                                if bits[0] == keywordArgs[0]:
75                                        self.setIngestFromDate(bits[1])
76                                elif bits[0] == keywordArgs[1]:
77                                        self.setPollInterval(bits[1])
78                                elif bits[0] == keywordArgs[8]:
79                                        print " - Running with specified config file!"
80                                        self.setOaiConfigFile(bits[1])
81                                        checkConfInArgs = True
82                                elif bits[0] == keywordArgs[2]:
83                                        print " - Running in single file ingestion mode!"
84                                        self.setIndFileToIngest(bits[1])
85                                elif bits[0] not in keywordArgs:
86                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
87                                        sys.exit(2)
88                                else:
89                                        setattr(self, bits[0], bits[1])
90                       
91                if checkConfInArgs is False:
92                        print "\nWARNING: You must now include the ingestConfigFile arguement for successful operation"
93                        sys.exit(2)
94               
95                print self.lineSeparator
96               
97                # NB, this is a slight fudge as cannot get the detailed logging to work
98                # without setting up a new logger here - which means we get two loggers
99                # outputing data. The initial call to logging needs to be tracked down
100                # and configured correctly, so this can be avoided...
101#               self.logger = logging.getLogger()
102#               self.logger.setLevel(loggingLevel)
103
104                # create console handler and set level to debug
105#               ch = logging.StreamHandler()
106#               ch.setLevel(loggingLevel)
107                # create formatter
108#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
109                # add formatter to ch
110#               ch.setFormatter(formatter)
111                # add ch to logger
112#               self.logger.addHandler(ch)
113               
114                return args
115
116
117        def getID(self, filename):
118                '''
119                Gets the identifier out of an input metadata xml record.
120                @param filename - name of document file being processed
121                @return: ID - id to use to refer to the document
122                '''
123                logging.info("Retrieving identifier for metadata record " + filename)
124       
125                xml=file(filename).read()
126               
127                ID = idget(xml)
128               
129                return ID
130       
131       
132        def addFileToPostgresDB(self, filename):
133                '''
134                Add a file to the postgres DB - extracting and storing all the required
135                data in the process
136                @param filename: full path of file to add to postgres DB
137                '''
138                logging.info("Adding file, " + filename + ", to postgres DB")
139               
140                numSlash = len(filename.split('/'))
141                               
142                shortFilename = filename.split('/')[numSlash - 1]
143               
144                self.recordAttemptToIngestDiscoveryID = ""
145               
146                self.ingestProblemMessage = ""
147               
148                #need to generate a local version of the iso data model (first call was to run through all available files
149                #and convert & place into relevant dirs as necessary
150                #generate the ISO object from the converted ISO here now..
151               
152                try:
153                                                                                                                       
154                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
155                       
156                                                       
157                        if self.isoDataModel.createISOdataStructure() is True:
158                                logging.info("ISO extractor worked fine!")
159                       
160                       
161                        elif self.isoDataModel.createISOdataStructure() is True:
162                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
163                                                #sys.exit() # TODO fix these
164                                return
165                        else:
166                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
167                                                #sys.exit()
168                                return
169                except:
170                        logging.error("Could not generate ISO XML!")
171                        return
172               
173                #get the converted ISO into a local var
174                try:
175                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
176                        self.isoXML = file(filename).read()
177                        self.originalXMLdoc = self.isoXML
178                                       
179                except:
180                        logging.warn("Could not extract converted ISO xml to local variable!")
181                        self.isoXML = ''
182                               
183               
184                # first of all create a PostgresRecord - this object represents all the data required
185                # for a DB entry
186                dao = None
187               
188                try:
189                       
190                        #if record is valid (vital elements checked in ExtractISO - validDoc is True if everything ok..
191                       
192                        #record whats bout to be ingested - if a problem this, error should already have been kicked out                                       
193                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[0][0]
194                       
195                        if self.isoDataModel.validDoc is True:                 
196                                                                               
197                                #record whats attempting to be ingested
198                                record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
199                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
200                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, self._currentMedinStandard, stubIso = self.isoXML)                       
201                       
202                                # Now create the data access object to interface to the DB                     
203                                dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc )
204                       
205                                # Finally, write the new record
206                                # 0 if failed, 1 if update, 2 if create
207                                returnCode = dao.createOrUpdateRecord() 
208                                               
209                                if returnCode == 2:
210                                        self._no_files_ingested += 1
211                               
212                                elif returnCode == 1:
213                                        self._no_files_changed += 1
214                                       
215                        else:
216                                self.ingestProblemMessage= "Content validity problems for record: %s (%s)" %(self.recordAttemptToIngestDiscoveryID,self.isoDataModel.processingMsg)
217                               
218                                logging.warn(self.ingestProblemMessage)
219                               
220                                #now raise error so can record this
221                                raise ValueError,ingestMessage
222                       
223                except:
224                       
225                       
226                        #if error encountered, add to failure lisr
227                        logging.error("Could not update: " + filename)                 
228                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
229                        self.updateFailList.append(originalRecordFilename)
230                       
231                        logging.error("Exception thrown - detail: ")
232                        errors = sys.exc_info()
233                        logging.error(errors)
234                        self._error_messages += "%s\n" %str(errors[1])
235                       
236                        if dao:
237                                logging.info("Removing record and its associated info from DB")
238                                logging.info("- to allow clean ingestion on rerun")
239                                try:
240                                        dao.deleteOriginalRecord()
241                                except:
242                                        logging.error("Problem encountered when removing record: ")
243                                        logging.error(sys.exc_info())
244                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
245
246                        self._no_problem_files += 1
247                       
248                        logging.info("Continue processing other files")
249                       
250                return self.recordAttemptToIngestDiscoveryID
251                       
252       
253        def getProcessingConfig(self,configFilePath):
254               
255                '''
256                Fed up with messing about with hardcoded directory paths. 
257                This method to get relevant values out of the oai_document_ingester.config file
258               
259                Returns a dictionary of values with keys:
260               
261                #directory in which the code resides
262                code_directory /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/
263
264                #base directory in which metadata is extracted to and converted to
265                base_directory /home/badc/discovery_docs/ingestDocs/
266
267                #directory in which to write reports to
268                reporting_directory /home/badc/discovery_docs/ingestDocs/data/
269
270                #path to the passwords file
271                passwords_file /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/passwords.txt
272
273                #datacentre config file directory path
274                datcentre_configs /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/
275                '''
276               
277                # Check this file exists; if not, assume an invalid datacentre has been specified
278               
279                if not os.path.isfile(configFilePath):
280                    sys.exit("ERROR: Could not find the processing config file")
281                   
282                processingConfig = {}
283                   
284                processing_config_file = open(configFilePath, "r")
285               
286                for line in processing_config_file.readlines():
287                        words  = string.split(line)
288                        if len(words) == 0:
289                                continue
290                        elif words[0] == 'code_directory':
291                                processingConfig['code_directory'] = words[1]
292                                                                                               
293                        elif words[0] == 'base_directory':
294                                processingConfig['base_directory'] = words[1]
295                               
296                        elif words[0] == 'reporting_directory':
297                                processingConfig['reporting_directory'] = words[1]
298                               
299                        elif words[0] == 'passwords_file':
300                                processingConfig['passwords_file'] = words[1]
301                               
302                        elif words[0] == 'datcentre_configs':
303                                processingConfig['datcentre_configs'] = words[1]
304                                       
305                        elif words[0] == 'NDG_redirect_URL':
306                                processingConfig['NDG_redirect_URL'] = words[1]
307                               
308                        elif words[0] == 'ingestConfig':
309                                if not os.path.isfile(words[1]):
310                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
311                                else:
312                                        processingConfig['ingestConfig'] = words[1]
313                               
314                        elif words[0] == 'acceptedFormats':
315                                processingConfig['acceptedFormats'] = words[1]
316                               
317                        elif words[0] == 'saxonJarFile':
318                                if not os.path.isfile(words[1]):
319                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
320                                else:
321                                        processingConfig['saxonJarFile'] = words[1]
322                        elif words[0] == 'xpathIsoClass':
323                               
324                                #build a dictionary with format as the key                             
325                                isoClass = words[1].split(":")                         
326                               
327                                if 'xpathIsoClass' not in processingConfig.keys():
328                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
329                                else:
330                                        temp = processingConfig['xpathIsoClass']                                       
331                                        temp.update({isoClass[0]:isoClass[1]})
332                                        processingConfig['xpathIsoClass'] = temp
333                               
334                        elif words[0] == 'xqueryStubIsoGen':
335                               
336                                #build a dictionary with format as the key
337                                isoClass = words[1].split(":")
338                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
339                               
340                        elif words[0] == 'xqueryConversions':
341                               
342                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
343                                processingConfig['xqueryConversions'] = words[1].split(",")
344                               
345                        elif words[0] == 'currentMEDIN':
346                               
347                                #define the current MEDIN standard so can cast conversions as this (at some stage we will ingest a few different versions)
348                                processingConfig['currentMEDIN'] = words[1]
349                               
350                        elif words[0] == 'exceptXqConv':
351                               
352                                #build a dictionary with format as the key
353                                xqClass = words[1].split(":")
354                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
355                               
356                        elif words[0] == 'xqDocTypes':
357                               
358                                #build a dictionary with format as the key
359                                xqClass = words[1].split(":")
360                               
361                                if 'xqDocTypes' not in processingConfig.keys():
362                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
363                                else:
364                                        temp = processingConfig['xqDocTypes']                                   
365                                        temp.update({xqClass[0]:xqClass[1]})
366                                        processingConfig['xqDocTypes'] = temp
367                                       
368                processing_config_file.close()
369                       
370                return processingConfig
371       
372       
373        def getConfigDetails(self, datacentre):
374                '''
375                Get the harvested records directory and groups for this datacentre from the
376                datacentre specific config file.  The harvested records directory depends on the
377                datacentres OAI base url, the set and format. These have to be know up-front.
378                The groups denote which 'portal groups' they belong to - for limiting searches to
379                say NERC-only datacentres records.
380                Groups are added to the intermediate MOLES when it is created.
381                @param datacentre: datacentre to use when looking up config file
382                '''
383                # initialise the variables to retrieve from the config file
384                self._harvest_home = ""
385                self._datacentre_groups = ""
386                self._datacentre_format = ""
387                self._datacentre_namespace = ""
388                self._NDG_dataProvider = False
389                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
390               
391               
392                #note changed to production buildout path
393                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
394               
395                #fed up with this rubbish causing problems - use a normal file opener.
396                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
397               
398               
399                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
400                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
401               
402                # Check this file exists; if not, assume an invalid datacentre has been specified
403                if not os.path.isfile(self._datacentre_config_filename):
404                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
405                        "specified (%s) is invalid\n" %datacentre)
406                   
407               
408                datacentre_config_file = open(self._datacentre_config_filename, "r")
409               
410                for line in datacentre_config_file.readlines():
411                        words  = string.split(line)
412                        if len(words) == 0:
413                                continue
414                        elif words[0] == 'host_path':
415                                self._harvest_home = words[1].rstrip()
416                        elif words[0] == 'groups':
417                                self._datacentre_groups = words[1:]
418                        elif words[0] == 'format':
419                                self._datacentre_format = words[1]
420                        elif words[0] == 'namespace':
421                                self._datacentre_namespace = words[1]
422                        elif words[0] == 'NDG_dataProvider':
423                                self._NDG_dataProvider = True
424                        elif words[0] == 'feed':
425                                self._IngestViaFeed = True
426                       
427                datacentre_config_file.close()
428               
429                #stop ingest on this datacentre if process thread is OAI and the config says FEED
430                if ((self.processThread == 'OAI') and self._IngestViaFeed):
431                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
432                                               
433                if self._harvest_home == "":
434                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
435               
436                logging.info("harvested records are in " + self._harvest_home)
437               
438                if self._datacentre_groups == "":
439                    logging.info("No groups/keywords set for datacentre " + datacentre)
440                else:
441                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
442               
443                if self._datacentre_format == "":
444                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
445               
446                logging.info("format being harvested: " + self._datacentre_format)
447               
448                if self._datacentre_namespace == "":
449                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
450               
451                logging.info("datacentre namespace: " + self._datacentre_namespace)
452               
453                if self._NDG_dataProvider:
454                        logging.info("Datacentre classified as an NDG data provider")
455                else:
456                        logging.info("Datacentre is not classified as an NDG data provider")
457                logging.info(self.lineSeparator)
458               
459               
460        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
461                '''
462                Processes/renames the files (changed 08/01/07 to get id from inside file)
463                 - also replace any namespace declarations with a standard one which we know works in NDG
464                 NB, this copies files from the original dir to the discovery dir
465                 @param originals_dir: directory to convert files from
466                 @param discovery_dir: directory in which converted files will end up
467                 @return numfilesproc: counter of number of files processed
468                '''
469                numfilesproc = 0                       
470               
471                self.inputFileOrigFinal = {}
472               
473                logging.info(self.lineSeparator)
474                logging.info("Renaming files:")
475               
476                for filename in os.listdir(originals_dir):
477                       
478                        if not filename.endswith('.xml'):
479                                logging.warning('File %s is not xml format. Not processed'  %(filename))
480                                continue
481                       
482                        original_filename = originals_dir + filename
483                       
484                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
485                        if format == 'DIF_9.4':
486                                                       
487                                logging.info("Converting DIF to stub ISO...")                                   
488                               
489                                #if DIF still need to convert URL's using the old method so when requesting DIF in original format we still have correct redirect urls....
490                                #redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self._ndgRedirectURL)
491                                                                                                               
492                                #gets the metadata id from the xml
493                                metadataID=self.getID(original_filename)
494                                #import pdb
495                                #pdb.set_trace()
496                                                               
497                                repositoryName = self._datacentre_namespace
498                               
499                                #get the name of the file to be used in the xquery
500                                metadataFilename = filename.replace('.xml', '')
501                                                               
502                                #where is the file to be ingested located?
503                                metadataFileLoc = originals_dir
504                               
505                                #generate a new stubISO filename
506                                self.isoFormat= "stubISO"
507                                                               
508                               
509                                #self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename)
510                                self.xqueryTransformation = xqueryTransformation(metadataFileLoc,repositoryName,metadataID,metadataFilename,self._saxonJarFile)
511                                                                                                                       
512                                #get hold of the xml for the transformed ISO                           
513                                self.isoXML = self.xqueryTransformation.runXquery(self._dif2isoXquery)
514                                                               
515                                #create the iso file on the system
516                                tempStubISOfile = original_filename + ".stubISO.xml"
517                                FileUtilities.createFile(tempStubISOfile , self.isoXML)
518                               
519                                #generate the ISO object from the converted ISO here now..
520                                try:
521                                                                                               
522                                        isoDataModel = ExtractISO(tempStubISOfile,self._isoClass[self._datacentre_format])
523                               
524                                        if isoDataModel.createISOdataStructure() is True:
525                                                logging.info("ISO extractor worked fine!")
526                       
527                       
528                                        elif isoDataModel.createISOdataStructure() is True:
529                                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
530                                                #sys.exit() # TODO fix these
531                                                return
532                                        else:
533                                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
534                                                #sys.exit()
535                                                return
536                                except:
537                                        logging.error("Could not generate ISO XML!")
538                                        return
539                               
540                                                               
541                                #need to get hold original format XML too (so can put directly into "transformed" docs section...
542                                self.originalXMLdoc = ''
543                               
544                                try:
545                                        logging.info("Extracting original input file into variable for input into original format in transformed docs table")
546                                        self.originalXMLdoc = file(original_filename).read()
547                                       
548                                except:
549                                        logging.warn("Could not extract original file to local variable!")
550                               
551                               
552                                #Back to main stream of ISO ingest
553                                if self._NDG_dataProvider:
554                                       
555                                        #to ensure "stubISO" doesnt make it into the filename on system and in the db, use the self.
556                                                               
557                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
558                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + format + "__"+ metadataFilename.replace(":", "-")+".xml"
559                                        new_filename_short = self._datacentre_namespace + "__" + format + "__" + metadataFilename.replace(":", "-")+".xml"
560                               
561                                       
562                                else:
563                                        ident = isoDataModel.datasetID[0][0]
564                                       
565                                        ident = ident.replace(":", "-")
566                                        ident = ident.replace("/", "-")
567                               
568                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
569                                        new_filename_short = self._datacentre_namespace+ "__"+ format + "__"+ ident +".xml"
570                               
571                                       
572                               
573                                #now create this stub ISO file on system so can access it
574                                self.genIsoFile = discovery_dir + new_filename_short   
575                                                               
576                                #FileUtilities.createFile(self.stubIsoFile, self.isoXML)
577                                #generate the redirect urls - now we have worked out the full discovery ndg name..
578                               
579                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile)
580                               
581                       
582                       
583                        #all other ISO formats/profiles to deal with.
584                        else:                           
585                                isoFormat = format
586                               
587                                self.originalXMLdoc = "null"
588                               
589                               
590                                #generate the ISO object from the ISO xml..                                                                             
591                                isoDataModel = ExtractISO(original_filename,self._isoClass[self._datacentre_format])
592                               
593                                if isoDataModel.createISOdataStructure() is True:
594                                                logging.info("ISO extractor worked fine!")
595                       
596                                elif isoDataModel.createISOdataStructure() is True:
597                                        logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
598                                                #sys.exit() # TODO fix these
599                                        return
600                                else:
601                                       
602                                        logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")
603                                       
604                                        #if this is a simple content problem don't return just yet (will catch it next time round so can include in report!
605                                        #if isoDataModel.processingMsg != '':                   
606                                        #       logging.error(" ")
607                                        #       logging.error("Failed with this reported error: %s" %isoDataModel.processingMsg )
608                                        #       logging.error(" ")
609                                       
610                                        return
611                               
612                                #Back to main stream of ISO ingest
613                                if self._NDG_dataProvider:
614                                                               
615                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
616                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
617                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
618                               
619                                else:
620                                       
621                                        ident = isoDataModel.datasetID[0][0]
622                               
623                                        ident = ident.replace(":", "-")
624                                        ident = ident.replace("/", "-")
625                               
626                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
627                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
628                               
629                                #now create this stub ISO file on system so can access it
630                                self.genIsoFile = discovery_dir + new_filename_short   
631                               
632                                #generate the redirect urls - now we have worked out the full discovery ndg name..
633                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile)
634                               
635                                #get the converted ISO into a local var
636                                try:
637                                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
638                                        self.isoXML = file(self.genIsoFile).read()
639                                        self.originalXMLdoc = self.isoXML
640                                       
641                                except:
642                                        logging.warn("Could not extract converted ISO xml to local variable!")
643                                        self.isoXML = ''
644                                       
645                        logging.info("original file = " + original_filename)
646                        logging.info("newfile = " + new_filename)
647                               
648                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
649                        #this links a derived filename in processing dir with original filename
650                        #get basic filename from the path+filename passed to this function to use as key               
651                        self.inputFileOrigFinal[new_filename_short]=filename
652                       
653                       
654                        numfilesproc += 1
655               
656                       
657                logging.info("File renaming and converting completed")
658                logging.info(self.lineSeparator)
659               
660                       
661                return numfilesproc
662       
663       
664        def _getPostgresDBConnection(self):
665                '''
666                Get the default postgres DB connection - by reading in data from the db config file
667                '''
668               
669                logging.debug("Setting up connection to postgres DB")
670                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
671               
672               
673                self.pgc = pgc(configFile = self._databaseConfigurationFile)
674                logging.info("Postgres DB connection now set up")
675
676
677
678        def     _backupAndCleanData(self):
679                '''
680                Back up ingested data for specified data centre, then clean any of
681                the ingest dirs
682                '''
683                logging.info("Backing up ingested data")
684                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
685                  strftime("%y%m%d_%H%M") + "_originals/"
686               
687                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
688                logging.info("Data backed up - now clearing ingest directories")
689                #Clear out the original harvest records area and discovery dir
690                #FileUtilities.cleanDir(self.originals_dir)
691                #FileUtilities.cleanDir(self.discovery_dir)
692                logging.info("Ingest directories cleared")
693
694
695        def     _setupDataCentreDirs(self):
696                '''
697                Set up directories appropriate for the current data centre
698                '''
699                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
700               
701               
702                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
703               
704                # the following dirs define where the specific documents should go
705                self.originals_dir = data_dir + "/oai/originals/" # copy of original
706                self.discovery_dir = data_dir + "/discovery/"
707                #self.stubISO_dir = data_dir + "/stub_iso/"
708               
709                # Create/clear the 'in' and 'out' directories
710                FileUtilities.setUpDir(self.originals_dir)
711                FileUtilities.setUpDir(self.discovery_dir)
712               
713                logging.info("Ingest directories for data centre set up")
714
715
716        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
717                '''
718                Convert files from originals dir to discovery one, then
719                ingest, backup and clean
720                @param originals_dir: directory to ingest docs from
721                @param discovery_dir: directory to use to process ingested docs
722                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
723                '''
724               
725               
726                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
727               
728                filenames = os.listdir(self.discovery_dir)
729               
730                ingestProbMsg = ""
731                               
732                #generate a list of files ALREADY in database so can compare with what has been ingested
733                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
734                               
735                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
736                filesPresentList=[]
737       
738               
739                if filePresentListArr:
740                        for fileArr in filePresentListArr:
741                                filesPresentList.append(fileArr[0])
742                               
743                                #TODO - is above relevant - duplicating functionlaity?
744       
745                #create list to to hold files ingest failed on.
746                self.updateFailList = []
747                self.deletedFailList = []                               
748               
749                for filename in filenames:
750                       
751                        fullPath = self.discovery_dir + filename
752                       
753                        if os.path.isfile(fullPath):
754                                       
755                                thisIngestedID = self.addFileToPostgresDB(fullPath)
756                               
757                                #record all problem messages
758                                ingestProbMsg = ingestProbMsg + self.ingestProblemMessage + "\n"
759                               
760                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
761                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
762                                        if thisIngestedID in filesPresentList: 
763                                                filesPresentList.remove(thisIngestedID)                 
764                               
765                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
766                #will need to be removed.
767               
768                #only do this if not in single file mode (or else wverything else gets deleted!)               
769                if ((self.indFileToIngest == "") & (feed)):
770                        for item in filesPresentList:
771                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
772                                DeleteRecord(item)
773                                self.deletedFailList.append(item)
774                                self._no_files_deleted += 1
775                       
776                       
777                self._backupAndCleanData()
778               
779                #at this stage put the reporting code in (will work for oai or atom feed)
780                #create a summary file for each data centre ingest
781                data_dir = self._base_dir + "data/"
782                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
783                recOpFile = open(recOpFileName,'w')
784               
785                logging.info("oai_document_ingest processing complete:")
786               
787                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
788                message = "Ingest report for data centre: " + datacentre + "\n"
789                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
790                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
791                message = message + "TOTAL_PROCESSED " + str(numfilesproc) + "\n"
792                message = message + "INGESTED_Created " + str(self._no_files_ingested)  + "\n"
793                message = message + "INGESTED_Updated " + str(self._no_files_changed)  + "\n"
794                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
795                message = message + "PROBLEM_FILES " + str(self._no_problem_files)  + "\n"
796                               
797                for badFile in self.updateFailList:
798                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")     
799                        message = message +"PROBLEM_FILE " + badFile + "\n"
800                       
801                message = message +  "\nBelow are comments recorded on problem files...\n\n"
802                       
803                #update with content from the problem messages recorded
804                message = message + ingestProbMsg + "\n"               
805                         
806                recOpFile.write(message)
807                               
808                return numfilesproc, message
809
810
811
812        def _setupXQueries(self):
813                '''
814                Set up the required XQueries
815                - NB, extract the xquery libraries locally for easy reference
816                '''
817                self._xq = ndgResources()
818                for libFile in self._xq.xqlib:
819                        # NB, we don't want the full path to the files - just the filename
820                        fileName = libFile.split('/')[-1]
821                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.