source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 6685

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@6685
Revision 6685, 29.0 KB checked in by sdonegan, 11 years ago (diff)

roundup updates - ingestion working well

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24
25class AbstractDocumentIngester(object):
26        '''
27        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
28        - including running the various transforms and parsings to get all doc types and spatiotemporal
29        data in the correct form in the DB
30        '''
31        lineSeparator = "------------------------------"
32               
33        # The directory to put things for a tape backup (should already exist)
34        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
35       
36        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
37               
38        def _setupCmdLineOptions(self):
39                '''
40                Determine the logging level to use and configure this appropriately
41                @return args: any input arguments - excluding options
42                '''
43               
44                # check for verbose option
45                try:
46                        opts, args = getopt.getopt(sys.argv[1:], "vd")
47                except getopt.GetoptError, err:
48                    # print help information and exit:
49                    print str(err) # will print something like "option -a not recognized"
50                    sys.exit(2)
51                   
52                if len(args) < 1:
53                        self.usage()
54                   
55                loggingLevel = logging.WARNING
56                for o, a in opts:
57                    if o == "-v":
58                        print " - Verbose mode ON"
59                        loggingLevel = logging.INFO
60                    elif o == "-d":
61                        print " - Debug mode ON"
62                        loggingLevel = logging.DEBUG
63                   
64               
65                # set up any keywords on the object
66                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
67                checkConfInArgs = False
68                for arg in args:
69                       
70                        keywordArgs=['ingestFromDate','interval','individualFile','interval','ingestFromDate','eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile']                                             
71                       
72                        bits = arg.split('=')
73                        if len(bits) == 2:
74                                if bits[0] == keywordArgs[0]:
75                                        self.setIngestFromDate(bits[1])
76                                elif bits[0] == keywordArgs[1]:
77                                        self.setPollInterval(bits[1])
78                                elif bits[0] == keywordArgs[8]:
79                                        print " - Running with specified config file!"
80                                        self.setOaiConfigFile(bits[1])
81                                        checkConfInArgs = True
82                                elif bits[0] == keywordArgs[2]:
83                                        print " - Running in single file ingestion mode!"
84                                        self.setIndFileToIngest(bits[1])
85                                elif bits[0] not in keywordArgs:
86                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
87                                        sys.exit(2)
88                                else:
89                                        setattr(self, bits[0], bits[1])
90                       
91                if checkConfInArgs is False:
92                        print "\nWARNING: You must now include the ingestConfigFile arguement for successful operation"
93                        sys.exit(2)
94               
95                print self.lineSeparator
96               
97                # NB, this is a slight fudge as cannot get the detailed logging to work
98                # without setting up a new logger here - which means we get two loggers
99                # outputing data. The initial call to logging needs to be tracked down
100                # and configured correctly, so this can be avoided...
101#               self.logger = logging.getLogger()
102#               self.logger.setLevel(loggingLevel)
103
104                # create console handler and set level to debug
105#               ch = logging.StreamHandler()
106#               ch.setLevel(loggingLevel)
107                # create formatter
108#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
109                # add formatter to ch
110#               ch.setFormatter(formatter)
111                # add ch to logger
112#               self.logger.addHandler(ch)
113               
114                return args
115
116
117        def getID(self, filename):
118                '''
119                Gets the identifier out of an input metadata xml record.
120                @param filename - name of document file being processed
121                @return: ID - id to use to refer to the document
122                '''
123                logging.info("Retrieving identifier for metadata record " + filename)
124       
125                xml=file(filename).read()
126               
127                ID = idget(xml)
128               
129                return ID
130       
131       
132        def addFileToPostgresDB(self, filename):
133                '''
134                Add a file to the postgres DB - extracting and storing all the required
135                data in the process
136                @param filename: full path of file to add to postgres DB
137                '''
138                logging.info("Adding file, " + filename + ", to postgres DB")
139               
140                numSlash = len(filename.split('/'))
141                               
142                shortFilename = filename.split('/')[numSlash - 1]
143               
144                self.recordAttemptToIngestDiscoveryID = ""
145               
146                #need to generate a local version of the iso data model (first call was to run through all available files
147                #and convert & place into relevant dirs as necessary
148                #generate the ISO object from the converted ISO here now..
149               
150                try:
151                                                                                               
152                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
153                               
154                        if self.isoDataModel.createISOdataStructure() is True:
155                                logging.info("ISO extractor worked fine!")
156                       
157                       
158                        elif self.isoDataModel.createISOdataStructure() is True:
159                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
160                                                #sys.exit() # TODO fix these
161                                return
162                        else:
163                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
164                                                #sys.exit()
165                                return
166                except:
167                        logging.error("Could not generate ISO XML!")
168                        return
169                               
170               
171                # first of all create a PostgresRecord - this object represents all the data required
172                # for a DB entry
173                dao = None
174               
175                try:
176                               
177                        #record whats bout to be ingestd
178                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[0][0]
179                                               
180                        #In new ingest system all vals parsed from iso xml are returned as either lists of dictionaries
181                       
182                        #record whats attempting to be ingested
183                        record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
184                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
185                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, self._currentMedinStandard, stubIso = self.isoXML)                       
186                       
187                        # Now create the data access object to interface to the DB                     
188                        dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc )
189                       
190                        # Finally, write the new record
191                        # 0 if failed, 1 if update, 2 if create
192                        returnCode = dao.createOrUpdateRecord() 
193                                               
194                        if returnCode == 2:
195                                self._no_files_ingested += 1
196                               
197                        elif returnCode == 1:
198                                self._no_files_changed += 1
199                               
200                       
201                except:
202                       
203                       
204                        #if error encountered, add to failure lisr
205                        logging.error("Could not update: " + filename)                 
206                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
207                        self.updateFailList.append(originalRecordFilename)
208                       
209                        logging.error("Exception thrown - detail: ")
210                        errors = sys.exc_info()
211                        logging.error(errors)
212                        self._error_messages += "%s\n" %str(errors[1])
213                       
214                        if dao:
215                                logging.info("Removing record and its associated info from DB")
216                                logging.info("- to allow clean ingestion on rerun")
217                                try:
218                                        dao.deleteOriginalRecord()
219                                except:
220                                        logging.error("Problem encountered when removing record: ")
221                                        logging.error(sys.exc_info())
222                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
223
224                        self._no_problem_files += 1
225                       
226                        logging.info("Continue processing other files")
227                       
228                return self.recordAttemptToIngestDiscoveryID
229                       
230       
231        def getProcessingConfig(self,configFilePath):
232               
233                '''
234                Fed up with messing about with hardcoded directory paths. 
235                This method to get relevant values out of the oai_document_ingester.config file
236               
237                Returns a dictionary of values with keys:
238               
239                #directory in which the code resides
240                code_directory /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/
241
242                #base directory in which metadata is extracted to and converted to
243                base_directory /home/badc/discovery_docs/ingestDocs/
244
245                #directory in which to write reports to
246                reporting_directory /home/badc/discovery_docs/ingestDocs/data/
247
248                #path to the passwords file
249                passwords_file /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/passwords.txt
250
251                #datacentre config file directory path
252                datcentre_configs /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/
253                '''
254               
255                # Check this file exists; if not, assume an invalid datacentre has been specified
256               
257                if not os.path.isfile(configFilePath):
258                    sys.exit("ERROR: Could not find the processing config file")
259                   
260                processingConfig = {}
261                   
262                processing_config_file = open(configFilePath, "r")
263               
264                for line in processing_config_file.readlines():
265                        words  = string.split(line)
266                        if len(words) == 0:
267                                continue
268                        elif words[0] == 'code_directory':
269                                processingConfig['code_directory'] = words[1]
270                                                                                               
271                        elif words[0] == 'base_directory':
272                                processingConfig['base_directory'] = words[1]
273                               
274                        elif words[0] == 'reporting_directory':
275                                processingConfig['reporting_directory'] = words[1]
276                               
277                        elif words[0] == 'passwords_file':
278                                processingConfig['passwords_file'] = words[1]
279                               
280                        elif words[0] == 'datcentre_configs':
281                                processingConfig['datcentre_configs'] = words[1]
282                                       
283                        elif words[0] == 'NDG_redirect_URL':
284                                processingConfig['NDG_redirect_URL'] = words[1]
285                               
286                        elif words[0] == 'ingestConfig':
287                                if not os.path.isfile(words[1]):
288                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
289                                else:
290                                        processingConfig['ingestConfig'] = words[1]
291                               
292                        elif words[0] == 'acceptedFormats':
293                                processingConfig['acceptedFormats'] = words[1]
294                               
295                        elif words[0] == 'saxonJarFile':
296                                if not os.path.isfile(words[1]):
297                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
298                                else:
299                                        processingConfig['saxonJarFile'] = words[1]
300                        elif words[0] == 'xpathIsoClass':
301                               
302                                #build a dictionary with format as the key                             
303                                isoClass = words[1].split(":")                         
304                               
305                                if 'xpathIsoClass' not in processingConfig.keys():
306                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
307                                else:
308                                        temp = processingConfig['xpathIsoClass']                                       
309                                        temp.update({isoClass[0]:isoClass[1]})
310                                        processingConfig['xpathIsoClass'] = temp
311                               
312                        elif words[0] == 'xqueryStubIsoGen':
313                               
314                                #build a dictionary with format as the key
315                                isoClass = words[1].split(":")
316                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
317                               
318                        elif words[0] == 'xqueryConversions':
319                               
320                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
321                                processingConfig['xqueryConversions'] = words[1].split(",")
322                               
323                        elif words[0] == 'currentMEDIN':
324                               
325                                #define the current MEDIN standard so can cast conversions as this (at some stage we will ingest a few different versions)
326                                processingConfig['currentMEDIN'] = words[1]
327                               
328                        elif words[0] == 'exceptXqConv':
329                               
330                                #build a dictionary with format as the key
331                                xqClass = words[1].split(":")
332                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
333                               
334                        elif words[0] == 'xqDocTypes':
335                               
336                                #build a dictionary with format as the key
337                                xqClass = words[1].split(":")
338                               
339                                if 'xqDocTypes' not in processingConfig.keys():
340                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
341                                else:
342                                        temp = processingConfig['xqDocTypes']                                   
343                                        temp.update({xqClass[0]:xqClass[1]})
344                                        processingConfig['xqDocTypes'] = temp
345                                       
346                processing_config_file.close()
347                       
348                return processingConfig
349       
350       
351        def getConfigDetails(self, datacentre):
352                '''
353                Get the harvested records directory and groups for this datacentre from the
354                datacentre specific config file.  The harvested records directory depends on the
355                datacentres OAI base url, the set and format. These have to be know up-front.
356                The groups denote which 'portal groups' they belong to - for limiting searches to
357                say NERC-only datacentres records.
358                Groups are added to the intermediate MOLES when it is created.
359                @param datacentre: datacentre to use when looking up config file
360                '''
361                # initialise the variables to retrieve from the config file
362                self._harvest_home = ""
363                self._datacentre_groups = ""
364                self._datacentre_format = ""
365                self._datacentre_namespace = ""
366                self._NDG_dataProvider = False
367                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
368               
369               
370                #note changed to production buildout path
371                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
372               
373                #fed up with this rubbish causing problems - use a normal file opener.
374                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
375               
376               
377                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
378                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
379               
380                # Check this file exists; if not, assume an invalid datacentre has been specified
381                if not os.path.isfile(self._datacentre_config_filename):
382                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
383                        "specified (%s) is invalid\n" %datacentre)
384                   
385               
386                datacentre_config_file = open(self._datacentre_config_filename, "r")
387               
388                for line in datacentre_config_file.readlines():
389                        words  = string.split(line)
390                        if len(words) == 0:
391                                continue
392                        elif words[0] == 'host_path':
393                                self._harvest_home = words[1].rstrip()
394                        elif words[0] == 'groups':
395                                self._datacentre_groups = words[1:]
396                        elif words[0] == 'format':
397                                self._datacentre_format = words[1]
398                        elif words[0] == 'namespace':
399                                self._datacentre_namespace = words[1]
400                        elif words[0] == 'NDG_dataProvider':
401                                self._NDG_dataProvider = True
402                        elif words[0] == 'feed':
403                                self._IngestViaFeed = True
404                       
405                datacentre_config_file.close()
406               
407                #stop ingest on this datacentre if process thread is OAI and the config says FEED
408                if ((self.processThread == 'OAI') and self._IngestViaFeed):
409                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
410                                               
411                if self._harvest_home == "":
412                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
413               
414                logging.info("harvested records are in " + self._harvest_home)
415               
416                if self._datacentre_groups == "":
417                    logging.info("No groups/keywords set for datacentre " + datacentre)
418                else:
419                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
420               
421                if self._datacentre_format == "":
422                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
423               
424                logging.info("format being harvested: " + self._datacentre_format)
425               
426                if self._datacentre_namespace == "":
427                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
428               
429                logging.info("datacentre namespace: " + self._datacentre_namespace)
430               
431                if self._NDG_dataProvider:
432                        logging.info("Datacentre classified as an NDG data provider")
433                else:
434                        logging.info("Datacentre is not classified as an NDG data provider")
435                logging.info(self.lineSeparator)
436               
437               
438        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
439                '''
440                Processes/renames the files (changed 08/01/07 to get id from inside file)
441                 - also replace any namespace declarations with a standard one which we know works in NDG
442                 NB, this copies files from the original dir to the discovery dir
443                 @param originals_dir: directory to convert files from
444                 @param discovery_dir: directory in which converted files will end up
445                 @return numfilesproc: counter of number of files processed
446                '''
447                numfilesproc = 0                       
448               
449                self.inputFileOrigFinal = {}
450               
451                logging.info(self.lineSeparator)
452                logging.info("Renaming files:")
453               
454                for filename in os.listdir(originals_dir):
455                       
456                        if not filename.endswith('.xml'):
457                                logging.warning('File %s is not xml format. Not processed'  %(filename))
458                                continue
459                       
460                        original_filename = originals_dir + filename
461                       
462                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
463                        if format == 'DIF_9.4':
464                                                       
465                                logging.info("Converting DIF to stub ISO...")                                   
466                               
467                                #if DIF still need to convert URL's using the old method so when requesting DIF in original format we still have correct redirect urls....
468                                #redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self._ndgRedirectURL)
469                                                                                                               
470                                #gets the metadata id from the xml
471                                metadataID=self.getID(original_filename)
472                                                               
473                                repositoryName = self._datacentre_namespace
474                               
475                                #get the name of the file to be used in the xquery
476                                metadataFilename = filename.replace('.xml', '')
477                                                               
478                                #where is the file to be ingested located?
479                                metadataFileLoc = originals_dir
480                               
481                                #generate a new stubISO filename
482                                self.isoFormat= "stubISO"
483                                                               
484                               
485                                #self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename)
486                                self.xqueryTransformation = xqueryTransformation(metadataFileLoc,repositoryName,metadataID,metadataFilename,self._saxonJarFile)
487                                                                                                                       
488                                #get hold of the xml for the transformed ISO                           
489                                self.isoXML = self.xqueryTransformation.runXquery(self._dif2isoXquery)
490                                                               
491                                #create the iso file on the system
492                                tempStubISOfile = original_filename + ".stubISO.xml"
493                                FileUtilities.createFile(tempStubISOfile , self.isoXML)
494                               
495                                #generate the ISO object from the converted ISO here now..
496                                try:
497                                                                                               
498                                        isoDataModel = ExtractISO(tempStubISOfile,self._isoClass[self._datacentre_format])
499                               
500                                        if isoDataModel.createISOdataStructure() is True:
501                                                logging.info("ISO extractor worked fine!")
502                       
503                       
504                                        elif isoDataModel.createISOdataStructure() is True:
505                                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
506                                                #sys.exit() # TODO fix these
507                                                return
508                                        else:
509                                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
510                                                #sys.exit()
511                                                return
512                                except:
513                                        logging.error("Could not generate ISO XML!")
514                                        return
515                               
516                                                               
517                                #need to get hold original format XML too (so can put directly into "transformed" docs section...
518                                self.originalXMLdoc = ''
519                               
520                                try:
521                                        logging.info("Extracting original input file into variable for input into original format in transformed docs table")
522                                        self.originalXMLdoc = file(original_filename).read()
523                                       
524                                except:
525                                        logging.warn("Could not extract original file to local variable!")
526                               
527                               
528                                #Back to main stream of ISO ingest
529                                if self._NDG_dataProvider:
530                                       
531                                        #to ensure "stubISO" doesnt make it into the filename on system and in the db, use the self.
532                                                               
533                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
534                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + self._currentMedinStandard + "__"+ metadataFilename.replace(":", "-")+".xml"
535                                        new_filename_short = self._datacentre_namespace + "__" + self._currentMedinStandard + "__" + metadataFilename.replace(":", "-")+".xml"
536                               
537                                       
538                                else:
539                                        ident = isoDataModel.datasetID[0][0]
540                                       
541                                        ident = ident.replace(":", "-")
542                                        ident = ident.replace("/", "-")
543                               
544                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ self._currentMedinStandard + "__"+ ident +".xml"
545                                        new_filename_short = self._datacentre_namespace+ "__"+ self._currentMedinStandard + "__"+ ident +".xml"
546                               
547                                       
548                               
549                                #now create this stub ISO file on system so can access it
550                                self.genIsoFile = discovery_dir + new_filename_short   
551                                                               
552                                #FileUtilities.createFile(self.stubIsoFile, self.isoXML)
553                                #generate the redirect urls - now we have worked out the full discovery ndg name..
554                               
555                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile)
556                               
557                       
558                       
559                        #all other ISO formats/profiles to deal with.
560                        else:                           
561                                isoFormat = format
562                               
563                                self.originalXMLdoc = "null"
564                               
565                               
566                                #generate the ISO object from the ISO xml..                                                                             
567                                isoDataModel = ExtractISO(original_filename,self._isoClass[self._datacentre_format])
568                               
569                                if isoDataModel.createISOdataStructure() is True:
570                                                logging.info("ISO extractor worked fine!")
571                       
572                                elif isoDataModel.createISOdataStructure() is True:
573                                        logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
574                                                #sys.exit() # TODO fix these
575                                        return
576                                else:
577                                        logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
578                                                #sys.exit()
579                                        return
580                               
581                                #Back to main stream of ISO ingest
582                                if self._NDG_dataProvider:
583                                                               
584                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
585                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
586                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
587                               
588                                else:
589                                       
590                                        ident = isoDataModel.datasetID[0][0]
591                               
592                                        ident = ident.replace(":", "-")
593                                        ident = ident.replace("/", "-")
594                               
595                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
596                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
597                               
598                                #now create this stub ISO file on system so can access it
599                                self.genIsoFile = discovery_dir + new_filename_short   
600                               
601                                #generate the redirect urls - now we have worked out the full discovery ndg name..
602                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile)
603                               
604                                #get the converted ISO into a local var
605                                try:
606                                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
607                                        self.isoXML = file(self.genIsoFile).read()
608                                        self.originalXMLdoc = self.isoXML
609                                       
610                                except:
611                                        logging.warn("Could not extract converted ISO xml to local variable!")
612                                        self.isoXML = ''
613                                       
614                        logging.info("original file = " + original_filename)
615                        logging.info("newfile = " + new_filename)
616                               
617                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
618                        #this links a derived filename in processing dir with original filename
619                        #get basic filename from the path+filename passed to this function to use as key               
620                        self.inputFileOrigFinal[new_filename_short]=filename
621                       
622                        '''
623                        TODO: do we need this namespace corrector anymore as not using MOLES?
624               
625                        # now correct any namespace issues
626                        try:
627                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
628                        except Exception, detail:
629                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
630                                logging.error("Detail: %s" %detail)
631                                logging.info("Continue with next file")
632                                continue
633                        '''
634                        numfilesproc += 1
635               
636                       
637                logging.info("File renaming and converting completed")
638                logging.info(self.lineSeparator)
639               
640                       
641                return numfilesproc
642       
643       
644        def _getPostgresDBConnection(self):
645                '''
646                Get the default postgres DB connection - by reading in data from the db config file
647                '''
648               
649                logging.debug("Setting up connection to postgres DB")
650                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
651               
652               
653                self.pgc = pgc(configFile = self._databaseConfigurationFile)
654                logging.info("Postgres DB connection now set up")
655
656
657
658        def     _backupAndCleanData(self):
659                '''
660                Back up ingested data for specified data centre, then clean any of
661                the ingest dirs
662                '''
663                logging.info("Backing up ingested data")
664                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
665                  strftime("%y%m%d_%H%M") + "_originals/"
666               
667                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
668                logging.info("Data backed up - now clearing ingest directories")
669                #Clear out the original harvest records area and discovery dir
670                #FileUtilities.cleanDir(self.originals_dir)
671                #FileUtilities.cleanDir(self.discovery_dir)
672                logging.info("Ingest directories cleared")
673
674
675        def     _setupDataCentreDirs(self):
676                '''
677                Set up directories appropriate for the current data centre
678                '''
679                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
680               
681               
682                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
683               
684                # the following dirs define where the specific documents should go
685                self.originals_dir = data_dir + "/oai/originals/" # copy of original
686                self.discovery_dir = data_dir + "/discovery/"
687                #self.stubISO_dir = data_dir + "/stub_iso/"
688               
689                # Create/clear the 'in' and 'out' directories
690                FileUtilities.setUpDir(self.originals_dir)
691                FileUtilities.setUpDir(self.discovery_dir)
692               
693                logging.info("Ingest directories for data centre set up")
694
695
696        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
697                '''
698                Convert files from originals dir to discovery one, then
699                ingest, backup and clean
700                @param originals_dir: directory to ingest docs from
701                @param discovery_dir: directory to use to process ingested docs
702                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
703                '''
704               
705               
706                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
707               
708                filenames = os.listdir(self.discovery_dir)
709                               
710                #generate a list of files ALREADY in database so can compare with what has been ingested
711                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
712                               
713                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
714                filesPresentList=[]
715       
716               
717                if filePresentListArr:
718                        for fileArr in filePresentListArr:
719                                filesPresentList.append(fileArr[0])
720                               
721                                #TODO - is above relevant - duplicating functionlaity?
722       
723                #create list to to hold files ingest failed on.
724                self.updateFailList = []
725                self.deletedFailList = []                               
726               
727                for filename in filenames:
728                       
729                        fullPath = self.discovery_dir + filename
730                       
731                        if os.path.isfile(fullPath):
732                                       
733                                thisIngestedID = self.addFileToPostgresDB(fullPath)
734                               
735                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
736                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
737                                        if thisIngestedID in filesPresentList: 
738                                                filesPresentList.remove(thisIngestedID)                 
739                               
740                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
741                #will need to be removed.
742               
743                #only do this if not in single file mode (or else wverything else gets deleted!)               
744                if ((self.indFileToIngest == "") & (feed)):
745                        for item in filesPresentList:
746                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
747                                DeleteRecord(item)
748                                self.deletedFailList.append(item)
749                                self._no_files_deleted += 1
750                       
751                       
752                self._backupAndCleanData()
753               
754                #at this stage put the reporting code in (will work for oai or atom feed)
755                #create a summary file for each data centre ingest
756                data_dir = self._base_dir + "data/"
757                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
758                recOpFile = open(recOpFileName,'w')
759               
760                logging.info("oai_document_ingest processing complete:")
761               
762                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
763                message = "Ingest report for data centre: " + datacentre + "\n"
764                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
765                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
766                message = message + "TOTAL_PROCESSED " + str(numfilesproc) + "\n"
767                message = message + "INGESTED_Created " + str(self._no_files_changed)  + "\n"
768                message = message + "INGESTED_Updated " + str(self._no_files_ingested)  + "\n"
769                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
770                message = message + "PROBLEM_FILES " + str(self._no_problem_files)  + "\n"
771               
772               
773                for badFile in self.updateFailList:
774                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
775                        message = message +"PROBLEM_FILE " + badFile + "\n"
776                         
777                recOpFile.write(message)
778                               
779                return numfilesproc, message
780
781
782
783        def _setupXQueries(self):
784                '''
785                Set up the required XQueries
786                - NB, extract the xquery libraries locally for easy reference
787                '''
788                self._xq = ndgResources()
789                for libFile in self._xq.xqlib:
790                        # NB, we don't want the full path to the files - just the filename
791                        fileName = libFile.split('/')[-1]
792                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.