source: TI01-discovery-Ingest/trunk/v3n_NDG3/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py @ 6361

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v3n_NDG3/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py@6361
Revision 6361, 13.7 KB checked in by sdonegan, 10 years ago (diff)

Updated to take configuration file location as a mandatory arguement. Example usage:../../bin/python2.5.1 oai_document_ingester.py -vd BODC individualFile=/disks/glue1/sdonegan/NDG3_workspace/buildouts/oai_document_ingester/ingestData/bodc/1048100_DIF.xml ingestConfigFile=/disks/glue1/sdonegan/NDG3_workspace/buildouts/oai_document_ingester_NDG3/ingestAutomation-upgrade/OAIBatch/oai_document_ingester.config

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Class to ingest documents from the eXist DB - by polling its feed and doing
4 ingests when updates are detected
5 @author: C Byrom, Tessella Jan 09
6'''
7import os, sys, logging, time, feedparser, datetime
8
9#TODO take this out for production (fudge to get logging to work.)
10logging.basicConfig(level=logging.DEBUG,
11                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
12
13from ndg.common.src.lib.utilities import formatDateYYYYMMDD, YEAR_FORMAT
14import ndg.common.src.lib.fileutilities as FileUtilities
15from ndg.common.src.clients.xmldb.eXist.feedclient import FeedClient as feedClient
16from abstractdocumentingester import AbstractDocumentIngester
17from ndg.common.src.models.vocabtermdata import VocabTermData as VTD
18from ndg.common.src.lib.atomutilities import *
19from ndg.common.src.models.ndgObject import ndgObject
20from ndg.common.src.clients.xmldb.eXist.searchclient import SearchClient
21import ndg.common.src.clients.xmldb.eXist.dbconstants as dc
22
23class FeedDocumentIngester(AbstractDocumentIngester):
24        '''
25        Class to handle the ingest of files from an eXist feed
26        '''
27        INTERVAL_ERROR = "Interval, '%s', is not a float or an int"
28       
29        # interval with which to check the feed for updates
30        pollInterval = 300.0
31       
32        # published time from which feed entries will be ingested
33        ingestFromDate = datetime.datetime.now()
34
35        # temp dir to create docs for running transforms on
36        TEMP_DIR_NAME = "feedPollTempDir"
37       
38        #needs this for compatibility with conventional ingestion
39        indFileToIngest=""
40
41        def __getDBConnection(self, dBHostname, configFileName):
42                '''
43                Get the default DB connection - by reading in data from the db config file
44                - to eXist DB to get feed docs from
45                @param eXistDBHostname: the hostname of the eXist DB to use for the ingest
46                @param configFileName: name of config file to get DB details from
47                '''
48                logging.info("Setting up connection to eXist DB")
49                self.dbClient = SearchClient(dbHostName = dBHostname,
50                                                                         configFileName = configFileName)
51                logging.info("Connection to eXist now set up")
52
53       
54        def setPollInterval(self, interval):
55                '''
56                Set the polling time interval
57                @param interval: an int or float representing the time between checking
58                the atom feed for updates
59                '''
60                if isinstance(interval, str) or isinstance(interval, unicode):
61                        interval = float(interval)
62                       
63                if isinstance(interval, int) or isinstance(interval, float):
64                        self.pollInterval = interval
65                else:
66                        raise ValueError(self.INTERVAL_ERROR %str(interval))
67               
68        def setOaiConfigFile(self, configFilePath):
69                '''
70                Set the path to the OAI configuration file - directories etc used for data & reporting etc etc
71                '''
72                print "**********************************************************************"
73                if configFilePath:
74                        self.oaiEditorConfig = configFilePath
75                        logging.info("Using configuration file at: " + configFilePath)
76               
77       
78        def setIngestFromDate(self, ingestFromDate):
79                '''
80                Set the published date from which feed entries should be ingested
81                @param ingestFromDate: date from which documents should be ingested - in
82                format YYYY-MM-DD
83                '''
84                ingestDate = formatDateYYYYMMDD(ingestFromDate)
85                if ingestDate:
86                        self.ingestFromDate = datetime.datetime.strptime(ingestDate, 
87                                                                                                                         YEAR_FORMAT)
88
89
90        def ingestDIFDocument(self, browseURL):
91                '''
92                Retrieve the DIF specified at the browseURL, then ingest
93                the document into postgres
94        @param browseURL: url to retrieve doc from
95                '''
96                logging.info("Retrieving Atom document from url: '%s'" %browseURL)
97               
98                self._no_files_ingested = 0
99                self._no_files_changed = 0
100                self._no_files_deleted = 0
101                self._no_problem_files = 0
102       
103                #if not self.feedClient:
104                #       self.__setupFeedClient()
105
106                # get the required info out of the URL
107                ndgURI = browseURL.split(VTD.BROWSE_STEM_URL)[-1]
108                no = ndgObject(ndgURI)
109               
110                # set up things for the provider of the data
111                dataCentre = no.repository.split('.')[0]
112               
113                # check if we're to ingest data from this data centre
114                if self.dataCentrePoll:
115                        if dataCentre != self.dataCentrePoll:
116                                logging.info("DIF doc is from a different data centre ('%s') to \
117                                        that being polled for ('%s') - skipping" %(dataCentre, self.dataCentrePoll))
118                                return
119               
120                # only bother reloading the data centre info, if we're really interested
121                if not self.dataCentre or (self.dataCentre and self.dataCentre != dataCentre):
122                        self.getConfigDetails(dataCentre)
123               
124                self.dataCentre = dataCentre
125                self._setupDataCentreDirs()
126               
127               
128                difFilename = self.originals_dir + no.localID + '.xml'         
129               
130                #SJD - encapsulate so catch errors - some problems with feeds.
131                try:
132                       
133                        doc = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + self.dbClient.getNDGDoc(no.repository, no.schema, no.localID,
134                                      targetCollection=dc.DIF_COLLECTION_PATH + no.repository)                 
135                       
136                        FileUtilities.createFile(difFilename, doc)
137                        print "*****************************************************************************   1"                       
138                        numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, self.dataCentre, False)                           
139                        print "*****************************************************************************   2"
140                        if numfilesproc != 0:
141                                reportLine = " Successfully obtained " + no.localID + "from feed and ingested document into database\n"
142                        else:
143                                reportLine = " Could NOT obtain " + no.localID + "from feed ! (check eXist ATOM feed...?)\n"
144                                               
145                except:
146                        reportLine =  " Error encountered : " + difFilename + " from feed (id = " + no.localID + " )\n"
147                       
148                        logging.error(str(time.asctime()) + " Could not ingest: " + difFilename + " (id = " + no.localID + " )")
149                       
150               
151                self.record.append(str(time.asctime()) + reportLine)
152                       
153               
154                logging.info("---------------------------------------------------------------------------------------------------")
155                #logging.info(processingReport)       # dont need this
156                logging.info("Document " + no.localID)
157                logging.info("(report: " + reportLine + ")")
158                logging.info("---------------------------------------------------------------------------------------------------")
159               
160                #return record list of transactions
161               
162        def __setupFeedClient(self):
163                '''
164                Set up a feed client - to allow use of RESTful interface to retrieve
165                atoms referenced in the feed
166                '''
167                self.feedClient = feedClient("", eXistDBHostname = self.eXistDBHostname,
168                                                                         eXistPortNo = self.eXistPortNo)
169
170        def processFeed(self, feedUrl):
171                '''
172                Poll the atom feed and ingest any new data found
173                NB, this is an infinite loop to allow permanent polling
174                @param feedUrl: url to the feed to poll
175                '''
176               
177                #setup log file for checking history of ingests etc
178                #self.summaryFileName = "/home/badc/discovery_docs/ingestDocs/data/FeedIngestSummary.txt"       
179                self.summaryFileName = self.processingDict['reporting_directory'] + 'FeedIngestSummary.txt'
180                       
181                self.summaryFile = open(self.summaryFileName,'a')   
182               
183                self.record = []
184                 
185               
186                # NB, only check the feed every five minutes
187                if time.time() > self.refreshTime:
188                        logging.info("- polling for new data...")
189                        feed = feedparser.parse(feedUrl)
190                        # check for new entries
191                        newEntryNumber = len(feed.entries)
192                        if  newEntryNumber > self.entryNumber:
193                               
194                                logging.info("New data found - checking if this has been published after specified ingest date...")
195                               
196                                for i in range(self.entryNumber, newEntryNumber):
197                                        if self.__isEntryIngestible(feed.entries[i]):
198                                                browseURL = feed.entries[i].content[0].src
199                                                self.ingestDIFDocument(browseURL) 
200                       
201                                self.entryNumber = len(feed.entries)
202                               
203                        #write this runs entries to log file
204                        for line in self.record:                               
205                                self.summaryFile.write(line)
206                               
207                        self.refreshTime = time.time() + self.pollInterval
208                        logging.info("- finished polling for new data")
209
210       
211        def pollFeed(self, feedUrl):
212                '''
213                Poll the atom feed and ingest any new data found
214                NB, this is an infinite loop to allow permanent polling
215                @param feedUrl: url to the feed to poll
216                '''
217                self.refreshTime = 0.
218                self.entryNumber = 0
219
220                while (1 == 1):
221                        self.processFeed(feedUrl)
222
223
224        def __isEntryIngestible(self, entry):
225                '''
226                Check if a feed entry is valid for ingestion, return True, if so
227                - to be ingested, the entry must be later than the specified
228                'ingest from' date and must represent a DIF record
229                @param entry: feedparser.entry from a feed to check
230                @return True, if entry should be ingested, false otherwise
231                '''
232                # firstly, check the entry is a DIF record
233                if entry.title.startswith(feedClient.DIF_ENTRY_TITLE):
234                        logging.info("- entry represents a DIF record - now check its date")
235                                                       
236                        # now check the entry is published after the specified ingest date
237                        entryDate = datetime.datetime(*entry.updated_parsed[0:3])
238                        if self.ingestFromDate < entryDate:
239                                logging.info("- data is new - so valid for ingesting")
240                                return True
241                        else:
242                                logging.info("- data is before the ingest time period")
243                else:
244                        logging.info("- entry doesn't represent a DIF record")
245                logging.info("- data is not valid for ingesting")
246                return False
247
248
249        def usage(self):
250                '''
251                Display input params for the script
252                '''
253                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
254                print "Usage: python feeddocumentingester.py [OPTION] <feed> [interval=..], [ingestFromDate=..]"
255                print "              [eXistDBHostname=..], [eXistPortNo=..], [dataCentrePoll=..]"
256                print " - where:\n <feed> is the atom feed to ingest data from; options are:"
257                print " -v - verbose mode for output logging"
258                print " -d - debug mode for output logging"
259                print " \nand keywords are:"
260                print " interval - interval, in seconds, at which to retrieve data from the feed"
261                print " ingestFromDate - date, in format, 'YYYY-MM-DD', from which documents should be ingested - if not set, ingest date is taken as the current time"
262                print " eXistDBHostname - name of eXist DB to retrieve data from - NB, this will likely be where the feed is based, too - default is 'chinook.badc.rl.ac.uk'"
263                print " eXistPortNo - port number used by the eXist DB - defaults to '8080'"
264                print " dataCentrePoll - data centre whose documents should be polled for - e.g 'badc', 'neodc' - if not set, all documents on a feed will be ingested"
265                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
266                sys.exit(2)
267
268
269        def __init__(self, interval = None, ingestFromDate = None, 
270                                 eXistDBHostname = 'chinook.badc.rl.ac.uk', 
271                                 eXistPortNo = '8080',
272                                 configFileName = 'passwords.txt', 
273                                 dataCentrePoll = None,
274                                 oaiConfigFileName = 'oai_document_ingester.config'):
275                '''
276                Set up object
277                @keyword interval: polling interval, in seconds, to set the feed polling to
278                - default is 300s
279                @keyword ingestFromDate: date from which to ingest docs - i.e. do not ingest
280                any feed entries before this date - in format, YYYY-MM-DD
281        @keyword eXistDBHostname: name of eXist DB to use
282        @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults
283        @keyword configFileName : password file to use - NB, this should
284        have contents of format (NB, space delimiter):
285        dbName userID password
286        Default is 'passwords.txt'.
287        to '8080' if not set
288        @keyword dataCentrePoll: to specify a datacentre to poll data for - e.g.
289        'badc' or 'neodc' - if not set, all data on the specified poll url will be retrieved
290                '''
291                self.dataCentre = None
292               
293                self.dataCentrePoll = dataCentrePoll
294               
295                if interval:
296                        self.setPollInterval(interval)
297                       
298                if ingestFromDate:
299                        self.setIngestFrom(ingestFromDate)
300               
301                self.eXistDBHostname = eXistDBHostname
302                self.eXistPortNo = eXistPortNo
303                self.configFileName = configFileName
304                self.refreshTime = 0.
305                self.entryNumber = 0
306                self._no_files_ingested = 0
307                self._no_problem_files = 0
308                self._error_messages = ''
309               
310                self.processThread = 'FEED'
311               
312                self.deName = VTD().TERM_DATA[VTD.DE_TERM].title
313
314                #self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
315               
316                #now use config info
317                '''
318                self.processingDict = self.getProcessingConfig('oai_document_ingester.config')
319               
320                #self._code_dir = "/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/" # this is the base dir that the script is ran from
321                #self._base_dir = "/home/badc/discovery_docs/ingestDocs/"
322               
323                self._code_dir = self.processingDict['code_directory']
324                self._base_dir = self.processingDict['base_directory']
325               
326                self._databaseConfigurationFile = self.processingDict['ingestConfig']
327                self._ndgRedirectURL = self.processingDict['NDG_redirect_URL']
328                               
329                '''
330                #initialise these for now.
331                self._code_dir = None
332                self._base_dir = None
333               
334                self._databaseConfigurationFile = None
335                self._ndgRedirectURL = None
336               
337        def runPoll(self, feedURL):
338                '''
339                Set up the feed poll and start it running
340                @param feedURL: url to poll
341                - NB, the feedURL will typically be something like
342                http://chinook.badc.rl.ac.uk:8080/exist/atom/content/db/DIF
343                '''
344                self.setupPoll()
345                self.pollFeed(feedURL)
346               
347
348        def setupPoll(self):
349                '''
350                Set up everything ready to run the polling client
351                '''
352                # NB, could get the docs via RESTful interface in feed client; currently
353                # doing this via document retrieve xmlrpc interface
354                #self.__setupFeedClient()
355                #set up the configuration needed for running everything else
356                self.processingDict = self.getProcessingConfig(self.oaiEditorConfig)
357               
358                self._code_dir = self.processingDict['code_directory']
359                self._base_dir = self.processingDict['base_directory']
360               
361                self._databaseConfigurationFile = self.processingDict['ingestConfig']
362                self._ndgRedirectURL = self.processingDict['NDG_redirect_URL']
363               
364                self._getPostgresDBConnection()
365                self.__getDBConnection(self.eXistDBHostname, self.configFileName)
366                self._setupXQueries()
367               
368               
369               
370       
371if __name__=="__main__":
372        print "================================="
373        print "RUNNING: FeedDocumentIngester.py"
374        ingester = FeedDocumentIngester()       
375        args = ingester._setupCmdLineOptions()
376        ingester.runPoll(args[0])
Note: See TracBrowser for help on using the repository browser.