source: TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py @ 6544

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v4n_MEDIN/ingestAutomation-upgrade/OAIBatch/abstractdocumentingester.py@6544
Revision 6544, 26.3 KB checked in by sdonegan, 11 years ago (diff)

Updated version of MEDIN ingest - will ingest NEODC DIFs, convert to ISO then load ISO into DB - all original NDG3 functionality inc configurable ingest now included.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Main script to do the document ingest from the OAI harvested files to the
4 discovery postgres DB.  NB, can be ran for all datacentres using the run_all_ingest.py script
5 or can specify an individual datacentre to run the ingester on.
6 As well as doing the ingest, a backup directory is created to store the created moles files.
7'''
8import os, sys, string, getopt, logging, re, pkg_resources
9from time import strftime
10from SchemaNameSpace import SchemaNameSpace
11from ndg.common.src.lib.ndgresources import ndgResources
12import ndg.common.src.lib.fileutilities as FileUtilities
13from ndg.common.src.clients.reldb.postgres.postgresclient import PostgresClient as pgc
14from PostgresRecord import PostgresRecord
15from PostgresDAO import PostgresDAO
16from Utilities import idget
17from Utilities import ndgRedirectTransform,redirectUrlChanger,xqueryTransformation
18from DeleteRecord import DeleteRecord
19import datetime,time
20from ndg.common.src.models.ndgObject import ndgObject
21from ndg.common.src.lib.ndgresources import ndgResources
22from ExtractISO import ExtractISO
23import inspect
24
25#temp
26from AbstractFormatIngester_original import AbstractFormatIngesterOriginal
27
28class AbstractDocumentIngester(object):
29        '''
30        Class to handle the ingest of files from the OAI harvester to the discovery service postgres DB
31        - including running the various transforms and parsings to get all doc types and spatiotemporal
32        data in the correct form in the DB
33        '''
34        lineSeparator = "------------------------------"
35               
36        # The directory to put things for a tape backup (should already exist)
37        BACKUP_DIR = '/home/badc/discovery_docs/ingestDocs/backup/'
38       
39        #NDG_redirect_URL = 'http://ndg3beta.badc.rl.ac.uk/URL_redirect_NDG3/ndgURLredirect/redirect?url='
40               
41        def _setupCmdLineOptions(self):
42                '''
43                Determine the logging level to use and configure this appropriately
44                @return args: any input arguments - excluding options
45                '''
46               
47                # check for verbose option
48                try:
49                        opts, args = getopt.getopt(sys.argv[1:], "vd")
50                except getopt.GetoptError, err:
51                    # print help information and exit:
52                    print str(err) # will print something like "option -a not recognized"
53                    sys.exit(2)
54                   
55                if len(args) < 1:
56                        self.usage()
57                   
58                loggingLevel = logging.WARNING
59                for o, a in opts:
60                    if o == "-v":
61                        print " - Verbose mode ON"
62                        loggingLevel = logging.INFO
63                    elif o == "-d":
64                        print " - Debug mode ON"
65                        loggingLevel = logging.DEBUG
66                   
67               
68                # set up any keywords on the object
69                # NB, be careful to keep the instance variables the same name as the keywords! (not also feed keywords also in here)
70                checkConfInArgs = False
71                for arg in args:
72                       
73                        keywordArgs=['ingestFromDate','interval','individualFile','interval','ingestFromDate','eXistDBHostname','eXistPortNo','dataCentrePoll','ingestConfigFile']                                             
74                       
75                        bits = arg.split('=')
76                        if len(bits) == 2:
77                                if bits[0] == keywordArgs[0]:
78                                        self.setIngestFromDate(bits[1])
79                                elif bits[0] == keywordArgs[1]:
80                                        self.setPollInterval(bits[1])
81                                elif bits[0] == keywordArgs[8]:
82                                        print " - Running with specified config file!"
83                                        self.setOaiConfigFile(bits[1])
84                                        checkConfInArgs = True
85                                elif bits[0] == keywordArgs[2]:
86                                        print " - Running in single file ingestion mode!"
87                                        self.setIndFileToIngest(bits[1])
88                                elif bits[0] not in keywordArgs:
89                                        print "\nWARNING: invalid keyword supplied (%s)! \n"%bits[0]
90                                        sys.exit(2)
91                                else:
92                                        setattr(self, bits[0], bits[1])
93                       
94                if checkConfInArgs is False:
95                        print "\nWARNING: You must now include the ingestConfigFile arguement for successful operation"
96                        sys.exit(2)
97               
98                print self.lineSeparator
99               
100                # NB, this is a slight fudge as cannot get the detailed logging to work
101                # without setting up a new logger here - which means we get two loggers
102                # outputing data. The initial call to logging needs to be tracked down
103                # and configured correctly, so this can be avoided...
104#               self.logger = logging.getLogger()
105#               self.logger.setLevel(loggingLevel)
106
107                # create console handler and set level to debug
108#               ch = logging.StreamHandler()
109#               ch.setLevel(loggingLevel)
110                # create formatter
111#               formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
112                # add formatter to ch
113#               ch.setFormatter(formatter)
114                # add ch to logger
115#               self.logger.addHandler(ch)
116               
117                return args
118
119
120        def getID(self, filename):
121                '''
122                Gets the identifier out of an input metadata xml record.
123                @param filename - name of document file being processed
124                @return: ID - id to use to refer to the document
125                '''
126                logging.info("Retrieving identifier for metadata record " + filename)
127       
128                xml=file(filename).read()
129               
130                ID = idget(xml)
131               
132                return ID
133       
134       
135        def addFileToPostgresDB(self, filename):
136                '''
137                Add a file to the postgres DB - extracting and storing all the required
138                data in the process
139                @param filename: full path of file to add to postgres DB
140                '''
141                logging.info("Adding file, " + filename + ", to postgres DB")
142               
143                numSlash = len(filename.split('/'))
144                               
145                shortFilename = filename.split('/')[numSlash - 1]
146               
147                self.recordAttemptToIngestDiscoveryID = ""                             
148               
149                # first of all create a PostgresRecord - this object represents all the data required
150                # for a DB entry
151                dao = None
152               
153                try:
154                       
155                        #set up iso xml object - format indicatees the ISO profile (including dif2iso conversions)                                             
156                        self.isoDataModel = ExtractISO(filename,self._isoClass[self._datacentre_format])
157                       
158                       
159                        if self.isoDataModel.createISOdataStructure() is True:
160                                logging.info("ISO extractor worked fine!")
161                       
162                       
163                        elif self.isoDataModel.createISOdataStructure() is True:
164                                logging.error ("Something wrong with ISO extractor... (ExtractISO returned False: xml access problem?)")                       
165                                #sys.exit() # TODO fix these
166                                return
167                        else:
168                                logging.error("Something SERIOUSELY wrong with extractor (couldnt get at ExtractISO class)")                   
169                                #sys.exit()
170                                return
171                               
172                        #record whats bout to be ingestd
173                        self.recordAttemptToIngestDiscoveryID = self.isoDataModel.datasetID[0][0]
174                       
175                        #In new ingest system all vals parsed from iso xml are returned as either lists of dictionaries
176                       
177                        #record whats attempting to be ingested         
178                        record = PostgresRecord(filename, self._NDG_dataProvider,self._datacentre_groups, self._datacentre_namespace,
179                                                                self.isoDataModel,self._xq, self._datacentre_format, self._xqueryConversionsExceptions, 
180                                                                self._xqueryConversions,self._saxonJarFile, self._xqueryDocTypes, self.originalXMLdoc, stubIso = self.isoXML)                   
181                       
182                        # Now create the data access object to interface to the DB                     
183                        dao = PostgresDAO(record, self.isoDataModel ,self.discovery_dir , pgClient = self.pgc )
184                       
185                        # Finally, write the new record
186                        # 0 if failed, 1 if update, 2 if create
187                        returnCode = dao.createOrUpdateRecord() 
188                                               
189                        if returnCode == 2:
190                                self._no_files_ingested += 1
191                               
192                        elif returnCode == 1:
193                                self._no_files_changed += 1
194                               
195                       
196                except:
197                       
198                       
199                        #if error encountered, add to failure lisr
200                        logging.error("Could not update: " + filename)                 
201                        originalRecordFilename = self.inputFileOrigFinal[shortFilename]
202                        self.updateFailList.append(originalRecordFilename)
203                       
204                        logging.error("Exception thrown - detail: ")
205                        errors = sys.exc_info()
206                        logging.error(errors)
207                        self._error_messages += "%s\n" %str(errors[1])
208                       
209                        if dao:
210                                logging.info("Removing record and its associated info from DB")
211                                logging.info("- to allow clean ingestion on rerun")
212                                try:
213                                        dao.deleteOriginalRecord()
214                                except:
215                                        logging.error("Problem encountered when removing record: ")
216                                        logging.error(sys.exc_info())
217                                        logging.error("NB, this record will need to be cleared manually from DB to ensure all relevant data is ingested")
218
219                        self._no_problem_files += 1
220                       
221                        logging.info("Continue processing other files")
222                       
223                return self.recordAttemptToIngestDiscoveryID
224                       
225       
226        def getProcessingConfig(self,configFilePath):
227               
228                '''
229                Fed up with messing about with hardcoded directory paths. 
230                This method to get relevant values out of the oai_document_ingester.config file
231               
232                Returns a dictionary of values with keys:
233               
234                #directory in which the code resides
235                code_directory /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/
236
237                #base directory in which metadata is extracted to and converted to
238                base_directory /home/badc/discovery_docs/ingestDocs/
239
240                #directory in which to write reports to
241                reporting_directory /home/badc/discovery_docs/ingestDocs/data/
242
243                #path to the passwords file
244                passwords_file /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/passwords.txt
245
246                #datacentre config file directory path
247                datcentre_configs /home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/
248                '''
249               
250                # Check this file exists; if not, assume an invalid datacentre has been specified
251               
252                if not os.path.isfile(configFilePath):
253                    sys.exit("ERROR: Could not find the processing config file")
254                   
255                processingConfig = {}
256                   
257                processing_config_file = open(configFilePath, "r")
258               
259                for line in processing_config_file.readlines():
260                        words  = string.split(line)
261                        if len(words) == 0:
262                                continue
263                        elif words[0] == 'code_directory':
264                                processingConfig['code_directory'] = words[1]
265                                                                                               
266                        elif words[0] == 'base_directory':
267                                processingConfig['base_directory'] = words[1]
268                               
269                        elif words[0] == 'reporting_directory':
270                                processingConfig['reporting_directory'] = words[1]
271                               
272                        elif words[0] == 'passwords_file':
273                                processingConfig['passwords_file'] = words[1]
274                               
275                        elif words[0] == 'datcentre_configs':
276                                processingConfig['datcentre_configs'] = words[1]
277                                       
278                        elif words[0] == 'NDG_redirect_URL':
279                                processingConfig['NDG_redirect_URL'] = words[1]
280                               
281                        elif words[0] == 'ingestConfig':
282                                if not os.path.isfile(words[1]):
283                                        sys.exit("ERROR: Could not find the Database configuration file (ingest.config)!!")
284                                else:
285                                        processingConfig['ingestConfig'] = words[1]
286                               
287                        elif words[0] == 'acceptedFormats':
288                                processingConfig['acceptedFormats'] = words[1]
289                               
290                        elif words[0] == 'saxonJarFile':
291                                if not os.path.isfile(words[1]):
292                                        sys.exit("ERROR: Could not find the SAXON JAR file!!")
293                                else:
294                                        processingConfig['saxonJarFile'] = words[1]
295                        elif words[0] == 'xpathIsoClass':
296                               
297                                #build a dictionary with format as the key                             
298                                isoClass = words[1].split(":")                         
299                               
300                                if 'xpathIsoClass' not in processingConfig.keys():
301                                        processingConfig['xpathIsoClass'] = {isoClass[0]:isoClass[1]}
302                                else:
303                                        temp = processingConfig['xpathIsoClass']                                       
304                                        temp.update({isoClass[0]:isoClass[1]})
305                                        processingConfig['xpathIsoClass'] = temp
306                               
307                        elif words[0] == 'xqueryStubIsoGen':
308                               
309                                #build a dictionary with format as the key
310                                isoClass = words[1].split(":")
311                                processingConfig['xqueryStubIsoGen'] = {isoClass[0]:isoClass[1]} 
312                               
313                        elif words[0] == 'xqueryConversions':
314                               
315                                #build a list of acceptable formats to convert into from ISO (except where format matches input format
316                                processingConfig['xqueryConversions'] = words[1].split(",")
317                               
318                        elif words[0] == 'exceptXqConv':
319                               
320                                #build a dictionary with format as the key
321                                xqClass = words[1].split(":")
322                                processingConfig['exceptXqConv'] = {xqClass[0]:xqClass[1]} 
323                               
324                        elif words[0] == 'xqDocTypes':
325                               
326                                #build a dictionary with format as the key
327                                xqClass = words[1].split(":")
328                               
329                                if 'xqDocTypes' not in processingConfig.keys():
330                                        processingConfig['xqDocTypes'] = {xqClass[0]:xqClass[1]}
331                                else:
332                                        temp = processingConfig['xqDocTypes']                                   
333                                        temp.update({xqClass[0]:xqClass[1]})
334                                        processingConfig['xqDocTypes'] = temp
335                                       
336                processing_config_file.close()
337                       
338                return processingConfig
339       
340       
341        def getConfigDetails(self, datacentre):
342                '''
343                Get the harvested records directory and groups for this datacentre from the
344                datacentre specific config file.  The harvested records directory depends on the
345                datacentres OAI base url, the set and format. These have to be know up-front.
346                The groups denote which 'portal groups' they belong to - for limiting searches to
347                say NERC-only datacentres records.
348                Groups are added to the intermediate MOLES when it is created.
349                @param datacentre: datacentre to use when looking up config file
350                '''
351                # initialise the variables to retrieve from the config file
352                self._harvest_home = ""
353                self._datacentre_groups = ""
354                self._datacentre_format = ""
355                self._datacentre_namespace = ""
356                self._NDG_dataProvider = False
357                self._IngestViaFeed = False # need to distinguish between oai and feed ingest
358               
359               
360                #note changed to production buildout path
361                #self._datacentre_config_filename = '/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/datacentre_config/' + datacentre + "_config.properties"
362               
363                #fed up with this rubbish causing problems - use a normal file opener.
364                #file = pkg_resources.resource_string('OAIBatch', datacentre_config_filename)
365               
366               
367                self._datacentre_config_filename = self._code_dir + 'datacentre_config/' + datacentre + "_config.properties"
368                logging.info("Retrieving data from datacentre config file, " + self._datacentre_config_filename)
369               
370                # Check this file exists; if not, assume an invalid datacentre has been specified
371                if not os.path.isfile(self._datacentre_config_filename):
372                    sys.exit("ERROR: Could not find the config file; either this doesn't exist or the datacentre " \
373                        "specified (%s) is invalid\n" %datacentre)
374                   
375               
376                datacentre_config_file = open(self._datacentre_config_filename, "r")
377               
378                for line in datacentre_config_file.readlines():
379                        words  = string.split(line)
380                        if len(words) == 0:
381                                continue
382                        elif words[0] == 'host_path':
383                                self._harvest_home = words[1].rstrip()
384                        elif words[0] == 'groups':
385                                self._datacentre_groups = words[1:]
386                        elif words[0] == 'format':
387                                self._datacentre_format = words[1]
388                        elif words[0] == 'namespace':
389                                self._datacentre_namespace = words[1]
390                        elif words[0] == 'NDG_dataProvider':
391                                self._NDG_dataProvider = True
392                        elif words[0] == 'feed':
393                                self._IngestViaFeed = True
394                       
395                datacentre_config_file.close()
396               
397                #stop ingest on this datacentre if process thread is OAI and the config says FEED
398                if ((self.processThread == 'OAI') and self._IngestViaFeed):
399                        sys.exit("Trying to ingest a FEED based centre.  Better stop now before it all gets a bit tricky!")
400                                               
401                if self._harvest_home == "":
402                    sys.exit("Failed at getting harvested records directory stage. datacentre config file tried = %s" %datacentre_config_filename)
403               
404                logging.info("harvested records are in " + self._harvest_home)
405               
406                if self._datacentre_groups == "":
407                    logging.info("No groups/keywords set for datacentre " + datacentre)
408                else:
409                    logging.info("datacentre groups/keywords: " + str(self._datacentre_groups))
410               
411                if self._datacentre_format == "":
412                    sys.exit("Failed at stage: getting datacentre format. datacentre config file tried = %s" %datacentre_config_filename)
413               
414                logging.info("format being harvested: " + self._datacentre_format)
415               
416                if self._datacentre_namespace == "":
417                    sys.exit("Failed at stage: getting datacentre namespace. datacentre config file tried = %s" %datacentre_config_filename)
418               
419                logging.info("datacentre namespace: " + self._datacentre_namespace)
420               
421                if self._NDG_dataProvider:
422                        logging.info("Datacentre classified as an NDG data provider")
423                else:
424                        logging.info("Datacentre is not classified as an NDG data provider")
425                logging.info(self.lineSeparator)
426               
427               
428        def _convertIngestFiles(self, originals_dir, discovery_dir, format):
429                '''
430                Processes/renames the files (changed 08/01/07 to get id from inside file)
431                 - also replace any namespace declarations with a standard one which we know works in NDG
432                 NB, this copies files from the original dir to the discovery dir
433                 @param originals_dir: directory to convert files from
434                 @param discovery_dir: directory in which converted files will end up
435                 @return numfilesproc: counter of number of files processed
436                '''
437                numfilesproc = 0                       
438               
439                self.inputFileOrigFinal = {}
440               
441                logging.info(self.lineSeparator)
442                logging.info("Renaming files:")
443               
444                for filename in os.listdir(originals_dir):
445                       
446                        if not filename.endswith('.xml'):
447                                logging.warning('File %s is not xml format. Not processed'  %(filename))
448                                continue
449                       
450                        original_filename = originals_dir + filename
451                       
452                        #if importing old dif format need to do a few extra things: 1. deal with urls to be redirected.  2. Convert to stub ISO so can continue with ISO ingest!
453                        if format == 'DIF_9.4':
454                                                       
455                                logging.info("Converting DIF to stub ISO...")                                   
456                               
457                                #if DIF still need to convert URL's using the old method so when requesting DIF in original format we still have correct redirect urls....
458                                redirectUrlChanger(original_filename,original_filename,self._datacentre_format, self._ndgRedirectURL)
459                                                                                                               
460                                #gets the metadata id from the xml
461                                metadataID=self.getID(original_filename)
462                                                               
463                                repositoryName = self._datacentre_namespace
464                               
465                                #get the name of the file to be used in the xquery
466                                metadataFilename = filename.replace('.xml', '')
467                                                               
468                                #where is the file to be ingested located?
469                                metadataFileLoc = originals_dir
470                               
471                                #generate a new stubISO filename
472                                self.isoFormat= "stubISO"
473                                                               
474                                #self.isoXML = self.convertDIFtoISO(metadataFileLoc,repositoryName,metadataID,metadataFilename)
475                                self.xqueryTransformation = xqueryTransformation(metadataFileLoc,repositoryName,metadataID,metadataFilename,self._saxonJarFile)
476                               
477                                #get hold of the xml for the transformed ISO
478                                self.isoXML = self.xqueryTransformation.runXquery(self._dif2isoXquery)
479                               
480                                #need to get hold original format XML too (so can put directly into "transformed" docs section...
481                                self.originalXMLdoc = ''
482                               
483                                try:
484                                        logging.info("Extracting original input file into variable for input into original format in transformed docs table")
485                                        self.originalXMLdoc = file(original_filename).read()
486                                       
487                                except:
488                                        logging.warn("Could not extract original file to local variable!")
489                               
490                               
491                                #Back to main stream of ISO ingest
492                                if self._NDG_dataProvider:
493                                                               
494                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
495                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + self.isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
496                                        new_filename_short = self._datacentre_namespace + "__" + self.isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
497                               
498                                else:
499                               
500                                        ident = ident.replace(":", "-")
501                                        ident = ident.replace("/", "-")
502                               
503                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ self.isoFormat + "__"+ ident +".xml"
504                                        new_filename_short = self._datacentre_namespace+ "__"+ self.isoFormat + "__"+ ident +".xml"
505                               
506                               
507                                #now create this stub ISO file on system so can access it
508                                self.stubIsoFile = discovery_dir + new_filename_short   
509                                                               
510                                FileUtilities.createFile(self.stubIsoFile, self.isoXML)
511                               
512                                logging.info("Stub ISO file created - at %s" %discovery_dir)
513                               
514                       
515                        #all other ISO formats/profiles to deal with.
516                        else:                           
517                                isoFormat = format
518                               
519                                #Back to main stream of ISO ingest
520                                if self._NDG_dataProvider:
521                                                               
522                                        #changed so new new NDG3 discovery can pick up Discoveru View URL's correctly?  (why should it be different to previous?)
523                                        new_filename = discovery_dir + self._datacentre_namespace + "__" + isoFormat + "__"+ metadataFilename.replace(":", "-")+".xml"
524                                        new_filename_short = self._datacentre_namespace + "__" + isoFormat + "__" + metadataFilename.replace(":", "-")+".xml"
525                               
526                                else:
527                               
528                                        ident = ident.replace(":", "-")
529                                        ident = ident.replace("/", "-")
530                               
531                                        new_filename = discovery_dir + "/" +self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
532                                        new_filename_short = self._datacentre_namespace+ "__"+ isoFormat + "__"+ ident +".xml"
533                               
534                               
535                        logging.info("original file = " + original_filename)
536                        logging.info("newfile = " + new_filename)
537                               
538                        #create list of all ORIGINAL filenames for ingest reporting (use actual filename)
539                        #this links a derived filename in processing dir with original filename
540                        #get basic filename from the path+filename passed to this function to use as key               
541                        self.inputFileOrigFinal[new_filename_short]=filename
542                       
543                        '''
544                        TODO: do we need this namespace corrector anymore as not using MOLES?
545               
546                        # now correct any namespace issues
547                        try:
548                            SchemaNameSpace(original_filename, new_filename, self._datacentre_format)
549                        except Exception, detail:
550                                logging.error("SchemaNameSpace failed on file %s" %original_filename)
551                                logging.error("Detail: %s" %detail)
552                                logging.info("Continue with next file")
553                                continue
554                        '''
555                        numfilesproc += 1
556               
557                       
558                logging.info("File renaming and converting completed")
559                logging.info(self.lineSeparator)
560               
561                       
562                return numfilesproc
563       
564       
565        def _getPostgresDBConnection(self):
566                '''
567                Get the default postgres DB connection - by reading in data from the db config file
568                '''
569               
570                logging.debug("Setting up connection to postgres DB")
571                #self.pgc = pgc(configFile = '/usr/local/ndg-oai-info-editor/ingest.config')
572               
573               
574                self.pgc = pgc(configFile = self._databaseConfigurationFile)
575                logging.info("Postgres DB connection now set up")
576
577
578
579        def     _backupAndCleanData(self):
580                '''
581                Back up ingested data for specified data centre, then clean any of
582                the ingest dirs
583                '''
584                logging.info("Backing up ingested data")
585                this_backupdir = self.BACKUP_DIR + self.dataCentre + "_" + \
586                  strftime("%y%m%d_%H%M") + "_originals/"
587               
588                #FileUtilities.makeBackUp(self.originals_dir, this_backupdir)
589                logging.info("Data backed up - now clearing ingest directories")
590                #Clear out the original harvest records area and discovery dir
591                #FileUtilities.cleanDir(self.originals_dir)
592                #FileUtilities.cleanDir(self.discovery_dir)
593                logging.info("Ingest directories cleared")
594
595
596        def     _setupDataCentreDirs(self):
597                '''
598                Set up directories appropriate for the current data centre
599                '''
600                logging.info("Setting up ingest directories for data centre, '%s'" %self.dataCentre)
601               
602               
603                data_dir = self._base_dir + "data/" + self.dataCentre  # dir relating to the specified dataCentre docs
604               
605                # the following dirs define where the specific documents should go
606                self.originals_dir = data_dir + "/oai/originals/" # copy of original
607                self.discovery_dir = data_dir + "/discovery/"
608                #self.stubISO_dir = data_dir + "/stub_iso/"
609               
610                # Create/clear the 'in' and 'out' directories
611                FileUtilities.setUpDir(self.originals_dir)
612                FileUtilities.setUpDir(self.discovery_dir)
613               
614                logging.info("Ingest directories for data centre set up")
615
616
617        def _convertAndIngestFiles(self, originals_dir, discovery_dir, datacentre, feed, format):
618                '''
619                Convert files from originals dir to discovery one, then
620                ingest, backup and clean
621                @param originals_dir: directory to ingest docs from
622                @param discovery_dir: directory to use to process ingested docs
623                @param feed: boolean, False if calling func is feedDocumentIngester, True - all other
624                '''
625               
626               
627                numfilesproc = self._convertIngestFiles(self.originals_dir, self.discovery_dir, format)
628               
629               
630                filenames = os.listdir(self.discovery_dir)
631                               
632                #generate a list of files ALREADY in database so can compare with what has been ingested
633                sqlCmd = "select discovery_id from original_document where discovery_id like '%" + self._datacentre_namespace + "%'"           
634                               
635                filePresentListArr = self.pgc.runSQLCommand(sqlCmd)
636                filesPresentList=[]
637       
638               
639                if filePresentListArr:
640                        for fileArr in filePresentListArr:
641                                filesPresentList.append(fileArr[0])
642                               
643                                #TODO - is above relevant - duplicating functionlaity?
644       
645                #create list to to hold files ingest failed on.
646                self.updateFailList = []
647                self.deletedFailList = []                               
648               
649                for filename in filenames:
650                        fullPath = self.discovery_dir + filename
651                       
652                        if os.path.isfile(fullPath):
653                                               
654                                logging.info("********************************************************************** sjd1 " + fullPath)
655                                               
656                                thisIngestedID = self.addFileToPostgresDB(fullPath)
657                               
658                                logging.info("********************************************************************** sjd2 " + thisIngestedID)
659                               
660                                #remove this filename from the list of files for this DC already in list (whats left over is to be deleted)
661                                if (thisIngestedID != "") and (len(filesPresentList) != 0):
662                                        if thisIngestedID in filesPresentList: 
663                                                filesPresentList.remove(thisIngestedID)                 
664                               
665                #test loop through remaining items in filePresentList - in order to synchronise with ingest directory - these
666                #will need to be removed.
667               
668                #only do this if not in single file mode (or else wverything else gets deleted!)               
669                if ((self.indFileToIngest == "") & (feed)):
670                        for item in filesPresentList:
671                                logging.info("NEED to DELETE : " + item + " IN ORDER TO SYNCHRONISE INGEST DIRECTORY WITH DATABASE CONTENTS")                   
672                                DeleteRecord(item)
673                                self.deletedFailList.append(item)
674                                self._no_files_deleted += 1
675                       
676                       
677                self._backupAndCleanData()
678               
679                #at this stage put the reporting code in (will work for oai or atom feed)
680                #create a summary file for each data centre ingest
681                data_dir = self._base_dir + "data/"
682                recOpFileName = data_dir + datacentre + "_ingestSummary.txt"           
683                recOpFile = open(recOpFileName,'w')
684               
685                logging.info("oai_document_ingest processing complete:")
686               
687                #Update log file details op on ingest to s atring which is THEN wrtten to a file, so as can return and email for info editor
688                message = "Ingest report for data centre: " + datacentre + "\n"
689                message = message + "Ingest date: " + str(datetime.datetime.now()) + "\n"
690                message = message + "Original metadata directory: " + self._harvest_home + "\n\n"
691                message = message + "TOTAL_PROCESSED " + str(numfilesproc) + "\n"
692                message = message + "INGESTED_Created " + str(self._no_files_changed)  + "\n"
693                message = message + "INGESTED_Updated " + str(self._no_files_ingested)  + "\n"
694                message = message + "DELETED " + str(self._no_files_deleted)  + "\n"
695                message = message + "PROBLEM_FILES " + str(self._no_problem_files)  + "\n"
696               
697               
698                for badFile in self.updateFailList:
699                        #recOpFile.write("PROBLEM_FILE " + badFile + "\n")
700                        message = message +"PROBLEM_FILE " + badFile + "\n"
701                         
702                recOpFile.write(message)
703                               
704                return numfilesproc, message
705
706
707
708        def _setupXQueries(self):
709                '''
710                Set up the required XQueries
711                - NB, extract the xquery libraries locally for easy reference
712                '''
713                self._xq = ndgResources()
714                for libFile in self._xq.xqlib:
715                        # NB, we don't want the full path to the files - just the filename
716                        fileName = libFile.split('/')[-1]
717                        FileUtilities.createFile(fileName, self._xq.xqlib[libFile])
Note: See TracBrowser for help on using the repository browser.