source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 6660

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@6660
Revision 6660, 28.8 KB checked in by sdonegan, 11 years ago (diff)

Update includes latest changes to allow full MEDIN xml and DIF xml ingest - now updates stubISO as proper MEDIN format in transformed docs table

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24
25class AbstractDocumentIngester(object):
26        '''
27        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
28        - including running the various transforms and parsings to get all doc types and spatiotemporal
29        data in the correct form in the DB
30        '''
31        lineSeparator = "------------------------------"
32               
33        # The directory to put things for a tape backup (should already exist)
34        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
35       
36        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
37               
38        def _setupCmdLineOptions(self):
39                '''
40                Determine the logging level to use and configure this appropriately
41                @return args: any input arguments - excluding options
42                '''
43               
44                # check for verbose option
45                try:
46                        opts, args = getopt.getopt(sys.argv[1:], "vd")
47                except getopt.GetoptError, err:
48                    # print help information and exit:
49                    print str(err) # will print something like "option -a not recognized"
50                    sys.exit(2)
51                   
52                if len(args) < 1:
53                        self.usage()
54                   
55                loggingLevel = logging.WARNING
56                for o, a in opts:
57                    if o == "-v":
58                        print " - Verbose mode ON"
59                        loggingLevel = logging.INFO
60                    elif o == "-d":
61                        print " - Debug mode ON"
62                        loggingLevel = logging.DEBUG
63                   
64               
65                # set up any keywords on the object
66                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
67                checkConfInArgs = False
68                for arg in args:
69                       
70                        keywordArgs=['ingestFromDate','interval','individualFile','interval','ingestFromDate','eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile']                                             
71                       
72                        bits = arg.split('=')
73                        if len(bits) == 2:
74                                if bits[0] == keywordArgs[0]:
75                                        self.setIngestFromDate(bits[1])
76                                elif bits[0] == keywordArgs[1]:
77                                        self.setPollInterval(bits[1])
78                                elif bits[0] == keywordArgs[8]:
79                                        print " - Running with specified config file!"
80                                        self.setOaiConfigFile(bits[1])
81                                        checkConfInArgs = True
82                                elif bits[0] == keywordArgs[2]:
83                                        print " - Running in single file ingestion mode!"
84                                        self.setIndFileToIngest(bits[1])
85                                elif bits[0] not in keywordArgs:
86                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
87                                        sys.exit(2)
88                                else:
89                                        setattr(self, bits[0], bits[1])
90                       
91                if checkConfInArgs is False:
92                        print "\nWARNING: You must now include the ingestConfigFile arguement for successful operation"
93                        sys.exit(2)
94               
95                print self.lineSeparator
96               
97                # NB, this is a slight fudge as cannot get the detailed logging to work
98                # without setting up a new logger here - which means we get two loggers
99                # outputing data. The initial call to logging needs to be tracked down
100                # and configured correctly, so this can be avoided...
101#               self.logger = logging.getLogger()
102#               self.logger.setLevel(loggingLevel)
103
104                # create console handler and set level to debug
105#               ch = logging.StreamHandler()
106#               ch.setLevel(loggingLevel)
107                # create formatter
108#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
109                # add formatter to ch
110#               ch.setFormatter(formatter)
111                # add ch to logger
112#               self.logger.addHandler(ch)
113               
114                return args
115
116
117        def getID(self, filename):
118                '''
119                Gets the identifier out of an input metadata xml record.
120                @param filename - name of document file being processed
121                @return: ID - id to use to refer to the document
122                '''
123                logging.info("Retrieving identifier for metadata record " + filename)
124       
125                xml=file(filename).read()
126               
127                ID = idget(xml)
128               
129                return ID
130       
131       
132        def addFileToPostgresDB(self, filename):
133                '''
134                Add a file to the postgres DB - extracting and storing all the required
135                data in the process
136                @param filename: full path of file to add to postgres DB
137                '''
138                logging.info("Adding file, " + filename + ", to postgres DB")
139               
140                numSlash = len(filename.split('/'))
141                               
142                shortFilename = filename.split('/')[numSlash - 1]
143               
144                self.recordAttemptToIngestDiscoveryID = ""
145               
146                #need to generate a local version of the iso data model (first call was to run through all available files
147                #and convert & place into relevant dirs as necessary
148                #generate the ISO object from the converted ISO here now..
149               
150                try:
151                                                                                               
152                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
153                               
154                        if self.isoDataModel.createISOdataStructure() is True:
155                                logging.info("ISO extractor worked fine!")
156                       
157                       
158                        elif self.isoDataModel.createISOdataStructure() is True:
159                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
160                                                #sys.exit() # TODO fix these
161                                return
162                        else:
163                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
164                                                #sys.exit()
165                                return
166                except:
167                        logging.error("Could not generate ISO XML!")
168                        return
169                               
170               
171                # first of all create a PostgresRecord - this object represents all the data required
172                # for a DB entry
173                dao = None
174               
175                try:
176                               
177                        #record whats bout to be ingestd
178                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[0][0]
179                                               
180                        #In new ingest system all vals parsed from iso xml are returned as either lists of dictionaries
181                       
182                        #record whats attempting to be ingested
183                        record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
184                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
185                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, self._currentMedinStandard, stubIso = self.isoXML)                       
186                       
187                        # Now create the data access object to interface to the DB                     
188                        dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc )
189                       
190                        # Finally, write the new record
191                        # 0 if failed, 1 if update, 2 if create
192                        returnCode = dao.createOrUpdateRecord() 
193                                               
194                        if returnCode == 2:
195                                self._no_files_ingested += 1
196                               
197                        elif returnCode == 1:
198                                self._no_files_changed += 1
199                               
200                       
201                except:
202                       
203                       
204                        #if error encountered, add to failure lisr
205                        logging.error("Could not update: " + filename)                 
206                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
207                        self.updateFailList.append(originalRecordFilename)
208                       
209                        logging.error("Exception thrown - detail: ")
210                        errors = sys.exc_info()
211                        logging.error(errors)
212                        self._error_messages += "%s\n" %str(errors[1])
213                       
214                        if dao:
215                                logging.info("Removing record and its associated info from DB")
216                                logging.info("- to allow clean ingestion on rerun")
217                                try:
218                                        dao.deleteOriginalRecord()
219                                except:
220                                        logging.error("Problem encountered when removing record: ")
221                                        logging.error(sys.exc_info())
222                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
223
224                        self._no_problem_files += 1
225                       
226                        logging.info("Continue processing other files")
227                       
228                return self.recordAttemptToIngestDiscoveryID
229                       
230       
231        def getProcessingConfig(self,configFilePath):
232               
233                '''
234                Fed up with messing about with hardcoded directory paths. 
235                This method to get relevant values out of the oai_document_ingester.config file
236               
237                Returns a dictionary of values with keys:
238               
239                #directory in which the code resides
240                code_directory /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/
241
242                #base directory in which metadata is extracted to and converted to
243                base_directory /home/badc/discovery_docs/ingestDocs/
244
245                #directory in which to write reports to
246                reporting_directory /home/badc/discovery_docs/ingestDocs/data/
247
248                #path to the passwords file
249                passwords_file /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/passwords.txt
250
251                #datacentre config file directory path
252                datcentre_configs /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/
253                '''
254               
255                # Check this file exists; if not, assume an invalid datacentre has been specified
256               
257                if not os.path.isfile(configFilePath):
258                    sys.exit("ERROR: Could not find the processing config file")
259                   
260                processingConfig = {}
261                   
262                processing_config_file = open(configFilePath, "r")
263               
264                for line in processing_config_file.readlines():
265                        words  = string.split(line)
266                        if len(words) == 0:
267                                continue
268                        elif words[0] == 'code_directory':
269                                processingConfig['code_directory'] = words[1]
270                                                                                               
271                        elif words[0] == 'base_directory':
272                                processingConfig['base_directory'] = words[1]
273                               
274                        elif words[0] == 'reporting_directory':
275                                processingConfig['reporting_directory'] = words[1]
276                               
277                        elif words[0] == 'passwords_file':
278                                processingConfig['passwords_file'] = words[1]
279                               
280                        elif words[0] == 'datcentre_configs':
281                                processingConfig['datcentre_configs'] = words[1]
282                                       
283                        elif words[0] == 'NDG_redirect_URL':
284                                processingConfig['NDG_redirect_URL'] = words[1]
285                               
286                        elif words[0] == 'ingestConfig':
287                                if not os.path.isfile(words[1]):
288                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
289                                else:
290                                        processingConfig['ingestConfig'] = words[1]
291                               
292                        elif words[0] == 'acceptedFormats':
293                                processingConfig['acceptedFormats'] = words[1]
294                               
295                        elif words[0] == 'saxonJarFile':
296                                if not os.path.isfile(words[1]):
297                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
298                                else:
299                                        processingConfig['saxonJarFile'] = words[1]
300                        elif words[0] == 'xpathIsoClass':
301                               
302                                #build a dictionary with format as the key                             
303                                isoClass = words[1].split(":")                         
304                               
305                                if 'xpathIsoClass' not in processingConfig.keys():
306                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
307                                else:
308                                        temp = processingConfig['xpathIsoClass']                                       
309                                        temp.update({isoClass[0]:isoClass[1]})
310                                        processingConfig['xpathIsoClass'] = temp
311                               
312                        elif words[0] == 'xqueryStubIsoGen':
313                               
314                                #build a dictionary with format as the key
315                                isoClass = words[1].split(":")
316                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
317                               
318                        elif words[0] == 'xqueryConversions':
319                               
320                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
321                                processingConfig['xqueryConversions'] = words[1].split(",")
322                               
323                        elif words[0] == 'currentMEDIN':
324                               
325                                #define the current MEDIN standard so can cast conversions as this (at some stage we will ingest a few different versions)
326                                processingConfig['currentMEDIN'] = words[1]
327                               
328                        elif words[0] == 'exceptXqConv':
329                               
330                                #build a dictionary with format as the key
331                                xqClass = words[1].split(":")
332                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
333                               
334                        elif words[0] == 'xqDocTypes':
335                               
336                                #build a dictionary with format as the key
337                                xqClass = words[1].split(":")
338                               
339                                if 'xqDocTypes' not in processingConfig.keys():
340                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
341                                else:
342                                        temp = processingConfig['xqDocTypes']                                   
343                                        temp.update({xqClass[0]:xqClass[1]})
344                                        processingConfig['xqDocTypes'] = temp
345                                       
346                processing_config_file.close()
347                       
348                return processingConfig
349       
350       
351        def getConfigDetails(self, datacentre):
352                '''
353                Get the harvested records directory and groups for this datacentre from the
354                datacentre specific config file.  The harvested records directory depends on the
355                datacentres OAI base url, the set and format. These have to be know up-front.
356                The groups denote which 'portal groups' they belong to - for limiting searches to
357                say NERC-only datacentres records.
358                Groups are added to the intermediate MOLES when it is created.
359                @param datacentre: datacentre to use when looking up config file
360                '''
361                # initialise the variables to retrieve from the config file
362                self._harvest_home = ""
363                self._datacentre_groups = ""
364                self._datacentre_format = ""
365                self._datacentre_namespace = ""
366                self._NDG_dataProvider = False
367                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
368               
369               
370                #note changed to production buildout path
371                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
372               
373                #fed up with this rubbish causing problems - use a normal file opener.
374                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
375               
376               
377                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
378                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
379               
380                # Check this file exists; if not, assume an invalid datacentre has been specified
381                if not os.path.isfile(self._datacentre_config_filename):
382                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
383                        "specified (%s) is invalid\n" %datacentre)
384                   
385               
386                datacentre_config_file = open(self._datacentre_config_filename, "r")
387               
388                for line in datacentre_config_file.readlines():
389                        words  = string.split(line)
390                        if len(words) == 0:
391                                continue
392                        elif words[0] == 'host_path':
393                                self._harvest_home = words[1].rstrip()
394                        elif words[0] == 'groups':
395                                self._datacentre_groups = words[1:]
396                        elif words[0] == 'format':
397                                self._datacentre_format = words[1]
398                        elif words[0] == 'namespace':
399                                self._datacentre_namespace = words[1]
400                        elif words[0] == 'NDG_dataProvider':
401                                self._NDG_dataProvider = True
402                        elif words[0] == 'feed':
403                                self._IngestViaFeed = True
404                       
405                datacentre_config_file.close()
406               
407                #stop ingest on this datacentre if process thread is OAI and the config says FEED
408                if ((self.processThread == 'OAI') and self._IngestViaFeed):
409                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
410                                               
411                if self._harvest_home == "":
412                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
413               
414                logging.info("harvested records are in " + self._harvest_home)
415               
416                if self._datacentre_groups == "":
417                    logging.info("No groups/keywords set for datacentre " + datacentre)
418                else:
419                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
420               
421                if self._datacentre_format == "":
422                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
423               
424                logging.info("format being harvested: " + self._datacentre_format)
425               
426                if self._datacentre_namespace == "":
427                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
428               
429                logging.info("datacentre namespace: " + self._datacentre_namespace)
430               
431                if self._NDG_dataProvider:
432                        logging.info("Datacentre classified as an NDG data provider")
433                else:
434                        logging.info("Datacentre is not classified as an NDG data provider")
435                logging.info(self.lineSeparator)
436               
437               
438        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
439                '''
440                Processes/renames the files (changed 08/01/07 to get id from inside file)
441                 - also replace any namespace declarations with a standard one which we know works in NDG
442                 NB, this copies files from the original dir to the discovery dir
443                 @param originals_dir: directory to convert files from
444                 @param discovery_dir: directory in which converted files will end up
445                 @return numfilesproc: counter of number of files processed
446                '''
447                numfilesproc = 0                       
448               
449                self.inputFileOrigFinal = {}
450               
451                logging.info(self.lineSeparator)
452                logging.info("Renaming files:")
453               
454                for filename in os.listdir(originals_dir):
455                       
456                        if not filename.endswith('.xml'):
457                                logging.warning('File %s is not xml format. Not processed'  %(filename))
458                                continue
459                       
460                        original_filename = originals_dir + filename
461                       
462                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
463                        if format == 'DIF_9.4':
464                                                       
465                                logging.info("Converting DIF to stub ISO...")                                   
466                               
467                                #if DIF still need to convert URL's using the old method so when requesting DIF in original format we still have correct redirect urls....
468                                #redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self._ndgRedirectURL)
469                                                                                                               
470                                #gets the metadata id from the xml
471                                metadataID=self.getID(original_filename)
472                                                               
473                                repositoryName = self._datacentre_namespace
474                               
475                                #get the name of the file to be used in the xquery
476                                metadataFilename = filename.replace('.xml', '')
477                                                               
478                                #where is the file to be ingested located?
479                                metadataFileLoc = originals_dir
480                               
481                                #generate a new stubISO filename
482                                self.isoFormat= "stubISO"
483                                                               
484                               
485                                #self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename)
486                                self.xqueryTransformation = xqueryTransformation(metadataFileLoc,repositoryName,metadataID,metadataFilename,self._saxonJarFile)
487                                                                                                                       
488                                #get hold of the xml for the transformed ISO                           
489                                self.isoXML = self.xqueryTransformation.runXquery(self._dif2isoXquery)
490                                                               
491                                #create the iso file on the system
492                                tempStubISOfile = original_filename + ".stubISO.xml"
493                                FileUtilities.createFile(tempStubISOfile , self.isoXML)
494                               
495                                #generate the ISO object from the converted ISO here now..
496                                try:
497                                                                                               
498                                        isoDataModel = ExtractISO(tempStubISOfile,self._isoClass[self._datacentre_format])
499                               
500                                        if isoDataModel.createISOdataStructure() is True:
501                                                logging.info("ISO extractor worked fine!")
502                       
503                       
504                                        elif isoDataModel.createISOdataStructure() is True:
505                                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
506                                                #sys.exit() # TODO fix these
507                                                return
508                                        else:
509                                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
510                                                #sys.exit()
511                                                return
512                                except:
513                                        logging.error("Could not generate ISO XML!")
514                                        return
515                               
516                                                               
517                                #need to get hold original format XML too (so can put directly into "transformed" docs section...
518                                self.originalXMLdoc = ''
519                               
520                                try:
521                                        logging.info("Extracting original input file into variable for input into original format in transformed docs table")
522                                        self.originalXMLdoc = file(original_filename).read()
523                                       
524                                except:
525                                        logging.warn("Could not extract original file to local variable!")
526                               
527                               
528                                #Back to main stream of ISO ingest
529                                if self._NDG_dataProvider:
530                                                               
531                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
532                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + self.isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
533                                        new_filename_short = self._datacentre_namespace + "__" + self.isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
534                               
535                                       
536                                else:
537                                        ident = isoDataModel.datasetID[0][0]
538                                       
539                                        ident = ident.replace(":", "-")
540                                        ident = ident.replace("/", "-")
541                               
542                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ self.isoFormat + "__"+ ident +".xml"
543                                        new_filename_short = self._datacentre_namespace+ "__"+ self.isoFormat + "__"+ ident +".xml"
544                               
545                                       
546                               
547                                #now create this stub ISO file on system so can access it
548                                self.genIsoFile = discovery_dir + new_filename_short   
549                                                               
550                                #FileUtilities.createFile(self.stubIsoFile, self.isoXML)
551                                #generate the redirect urls - now we have worked out the full discovery ndg name..
552                               
553                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile)
554                               
555                       
556                       
557                        #all other ISO formats/profiles to deal with.
558                        else:                           
559                                isoFormat = format
560                               
561                                self.originalXMLdoc = "null"
562                               
563                               
564                                #generate the ISO object from the ISO xml..                                                                             
565                                isoDataModel = ExtractISO(original_filename,self._isoClass[self._datacentre_format])
566                               
567                                if isoDataModel.createISOdataStructure() is True:
568                                                logging.info("ISO extractor worked fine!")
569                       
570                                elif isoDataModel.createISOdataStructure() is True:
571                                        logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
572                                                #sys.exit() # TODO fix these
573                                        return
574                                else:
575                                        logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
576                                                #sys.exit()
577                                        return
578                               
579                                #Back to main stream of ISO ingest
580                                if self._NDG_dataProvider:
581                                                               
582                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
583                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
584                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
585                               
586                                else:
587                                       
588                                        ident = isoDataModel.datasetID[0][0]
589                               
590                                        ident = ident.replace(":", "-")
591                                        ident = ident.replace("/", "-")
592                               
593                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
594                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
595                               
596                                #now create this stub ISO file on system so can access it
597                                self.genIsoFile = discovery_dir + new_filename_short   
598                               
599                                #generate the redirect urls - now we have worked out the full discovery ndg name..
600                                isoDataModel.generateNDGredirectURL(self._ndgRedirectURL,self.genIsoFile)
601                               
602                                #get the converted ISO into a local var
603                                try:
604                                        logging.info("Extracting converted ISO file into variable for input into original format in transformed docs table")
605                                        self.isoXML = file(self.genIsoFile).read()
606                                        self.originalXMLdoc = self.isoXML
607                                       
608                                except:
609                                        logging.warn("Could not extract converted ISO xml to local variable!")
610                                        self.isoXML = ''
611                                       
612                        logging.info("original file = " + original_filename)
613                        logging.info("newfile = " + new_filename)
614                               
615                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
616                        #this links a derived filename in processing dir with original filename
617                        #get basic filename from the path+filename passed to this function to use as key               
618                        self.inputFileOrigFinal[new_filename_short]=filename
619                       
620                        '''
621                        TODO: do we need this namespace corrector anymore as not using MOLES?
622               
623                        # now correct any namespace issues
624                        try:
625                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
626                        except Exception, detail:
627                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
628                                logging.error("Detail: %s" %detail)
629                                logging.info("Continue with next file")
630                                continue
631                        '''
632                        numfilesproc += 1
633               
634                       
635                logging.info("File renaming and converting completed")
636                logging.info(self.lineSeparator)
637               
638                       
639                return numfilesproc
640       
641       
642        def _getPostgresDBConnection(self):
643                '''
644                Get the default postgres DB connection - by reading in data from the db config file
645                '''
646               
647                logging.debug("Setting up connection to postgres DB")
648                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
649               
650               
651                self.pgc = pgc(configFile = self._databaseConfigurationFile)
652                logging.info("Postgres DB connection now set up")
653
654
655
656        def     _backupAndCleanData(self):
657                '''
658                Back up ingested data for specified data centre, then clean any of
659                the ingest dirs
660                '''
661                logging.info("Backing up ingested data")
662                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
663                  strftime("%y%m%d_%H%M") + "_originals/"
664               
665                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
666                logging.info("Data backed up - now clearing ingest directories")
667                #Clear out the original harvest records area and discovery dir
668                #FileUtilities.cleanDir(self.originals_dir)
669                #FileUtilities.cleanDir(self.discovery_dir)
670                logging.info("Ingest directories cleared")
671
672
673        def     _setupDataCentreDirs(self):
674                '''
675                Set up directories appropriate for the current data centre
676                '''
677                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
678               
679               
680                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
681               
682                # the following dirs define where the specific documents should go
683                self.originals_dir = data_dir + "/oai/originals/" # copy of original
684                self.discovery_dir = data_dir + "/discovery/"
685                #self.stubISO_dir = data_dir + "/stub_iso/"
686               
687                # Create/clear the 'in' and 'out' directories
688                FileUtilities.setUpDir(self.originals_dir)
689                FileUtilities.setUpDir(self.discovery_dir)
690               
691                logging.info("Ingest directories for data centre set up")
692
693
694        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
695                '''
696                Convert files from originals dir to discovery one, then
697                ingest, backup and clean
698                @param originals_dir: directory to ingest docs from
699                @param discovery_dir: directory to use to process ingested docs
700                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
701                '''
702               
703               
704                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
705               
706                filenames = os.listdir(self.discovery_dir)
707                               
708                #generate a list of files ALREADY in database so can compare with what has been ingested
709                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
710                               
711                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
712                filesPresentList=[]
713       
714               
715                if filePresentListArr:
716                        for fileArr in filePresentListArr:
717                                filesPresentList.append(fileArr[0])
718                               
719                                #TODO - is above relevant - duplicating functionlaity?
720       
721                #create list to to hold files ingest failed on.
722                self.updateFailList = []
723                self.deletedFailList = []                               
724               
725                for filename in filenames:
726                       
727                        fullPath = self.discovery_dir + filename
728                       
729                        if os.path.isfile(fullPath):
730                                       
731                                thisIngestedID = self.addFileToPostgresDB(fullPath)
732                               
733                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
734                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
735                                        if thisIngestedID in filesPresentList: 
736                                                filesPresentList.remove(thisIngestedID)                 
737                               
738                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
739                #will need to be removed.
740               
741                #only do this if not in single file mode (or else wverything else gets deleted!)               
742                if ((self.indFileToIngest == "") & (feed)):
743                        for item in filesPresentList:
744                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
745                                DeleteRecord(item)
746                                self.deletedFailList.append(item)
747                                self._no_files_deleted += 1
748                       
749                       
750                self._backupAndCleanData()
751               
752                #at this stage put the reporting code in (will work for oai or atom feed)
753                #create a summary file for each data centre ingest
754                data_dir = self._base_dir + "data/"
755                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
756                recOpFile = open(recOpFileName,'w')
757               
758                logging.info("oai_document_ingest processing complete:")
759               
760                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
761                message = "Ingest report for data centre: " + datacentre + "\n"
762                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
763                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
764                message = message + "TOTAL_PROCESSED " + str(numfilesproc) + "\n"
765                message = message + "INGESTED_Created " + str(self._no_files_changed)  + "\n"
766                message = message + "INGESTED_Updated " + str(self._no_files_ingested)  + "\n"
767                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
768                message = message + "PROBLEM_FILES " + str(self._no_problem_files)  + "\n"
769               
770               
771                for badFile in self.updateFailList:
772                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
773                        message = message +"PROBLEM_FILE " + badFile + "\n"
774                         
775                recOpFile.write(message)
776                               
777                return numfilesproc, message
778
779
780
781        def _setupXQueries(self):
782                '''
783                Set up the required XQueries
784                - NB, extract the xquery libraries locally for easy reference
785                '''
786                self._xq = ndgResources()
787                for libFile in self._xq.xqlib:
788                        # NB, we don't want the full path to the files - just the filename
789                        fileName = libFile.split('/')[-1]
790                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.