source: TI01-discovery-Ingest/trunk/v3n_NDG3/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py @ 6206

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery-Ingest/trunk/v3n_NDG3/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py@6206
Revision 6206, 12.5 KB checked in by sdonegan, 10 years ago (diff)

Updated feed ingester with latest config

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Class to ingest documents from the eXist DB - by polling its feed and doing
4 ingests when updates are detected
5 @author: C Byrom, Tessella Jan 09
6'''
7import os, sys, logging, time, feedparser, datetime
8
9#TODO take this out for production (fudge to get logging to work.)
10logging.basicConfig(level=logging.DEBUG,
11                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
12
13from ndg.common.src.lib.utilities import formatDateYYYYMMDD, YEAR_FORMAT
14import ndg.common.src.lib.fileutilities as FileUtilities
15from ndg.common.src.clients.xmldb.eXist.feedclient import FeedClient as feedClient
16from abstractdocumentingester import AbstractDocumentIngester
17from ndg.common.src.models.vocabtermdata import VocabTermData as VTD
18from ndg.common.src.lib.atomutilities import *
19from ndg.common.src.models.ndgObject import ndgObject
20from ndg.common.src.clients.xmldb.eXist.searchclient import SearchClient
21import ndg.common.src.clients.xmldb.eXist.dbconstants as dc
22
23class FeedDocumentIngester(AbstractDocumentIngester):
24        '''
25        Class to handle the ingest of files from an eXist feed
26        '''
27        INTERVAL_ERROR = "Interval, '%s', is not a float or an int"
28       
29        # interval with which to check the feed for updates
30        pollInterval = 300.0
31       
32        # published time from which feed entries will be ingested
33        ingestFromDate = datetime.datetime.now()
34
35        # temp dir to create docs for running transforms on
36        TEMP_DIR_NAME = "feedPollTempDir"
37       
38        #needs this for compatibility with conventional ingestion
39        indFileToIngest=""
40
41        def __getDBConnection(self, dBHostname, configFileName):
42                '''
43                Get the default DB connection - by reading in data from the db config file
44                - to eXist DB to get feed docs from
45                @param eXistDBHostname: the hostname of the eXist DB to use for the ingest
46                @param configFileName: name of config file to get DB details from
47                '''
48                logging.info("Setting up connection to eXist DB")
49                self.dbClient = SearchClient(dbHostName = dBHostname,
50                                                                         configFileName = configFileName)
51                logging.info("Connection to eXist now set up")
52
53       
54        def setPollInterval(self, interval):
55                '''
56                Set the polling time interval
57                @param interval: an int or float representing the time between checking
58                the atom feed for updates
59                '''
60                if isinstance(interval, str) or isinstance(interval, unicode):
61                        interval = float(interval)
62                       
63                if isinstance(interval, int) or isinstance(interval, float):
64                        self.pollInterval = interval
65                else:
66                        raise ValueError(self.INTERVAL_ERROR %str(interval))
67
68       
69        def setIngestFromDate(self, ingestFromDate):
70                '''
71                Set the published date from which feed entries should be ingested
72                @param ingestFromDate: date from which documents should be ingested - in
73                format YYYY-MM-DD
74                '''
75                ingestDate = formatDateYYYYMMDD(ingestFromDate)
76                if ingestDate:
77                        self.ingestFromDate = datetime.datetime.strptime(ingestDate, 
78                                                                                                                         YEAR_FORMAT)
79
80
81        def ingestDIFDocument(self, browseURL):
82                '''
83                Retrieve the DIF specified at the browseURL, then ingest
84                the document into postgres
85        @param browseURL: url to retrieve doc from
86                '''
87                logging.info("Retrieving Atom document from url: '%s'" %browseURL)
88               
89                self._no_files_ingested = 0
90                self._no_files_changed = 0
91                self._no_files_deleted = 0
92                self._no_problem_files = 0
93       
94                #if not self.feedClient:
95                #       self.__setupFeedClient()
96
97                # get the required info out of the URL
98                ndgURI = browseURL.split(VTD.BROWSE_STEM_URL)[-1]
99                no = ndgObject(ndgURI)
100               
101                # set up things for the provider of the data
102                dataCentre = no.repository.split('.')[0]
103               
104                # check if we're to ingest data from this data centre
105                if self.dataCentrePoll:
106                        if dataCentre != self.dataCentrePoll:
107                                logging.info("DIF doc is from a different data centre ('%s') to \
108                                        that being polled for ('%s') - skipping" %(dataCentre, self.dataCentrePoll))
109                                return
110               
111                # only bother reloading the data centre info, if we're really interested
112                if not self.dataCentre or (self.dataCentre and self.dataCentre != dataCentre):
113                        self.getConfigDetails(dataCentre)
114               
115                self.dataCentre = dataCentre
116                self._setupDataCentreDirs()
117               
118               
119                difFilename = self.originals_dir + no.localID + '.xml'         
120               
121                #SJD - encapsulate so catch errors - some problems with feeds.
122                try:
123                       
124                        doc = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + self.dbClient.getNDGDoc(no.repository, no.schema, no.localID,
125                                      targetCollection=dc.DIF_COLLECTION_PATH + no.repository)                 
126                       
127                        FileUtilities.createFile(difFilename, doc)                             
128                        numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, self.dataCentre, False)                           
129                       
130                        if numfilesproc != 0:
131                                reportLine = " Successfully obtained " + no.localID + "from feed and ingested document into database\n"
132                        else:
133                                reportLine = " Could NOT obtain " + no.localID + "from feed ! (check eXist ATOM feed...?)\n"
134                                               
135                except:
136                        reportLine =  " Error encountered : " + difFilename + " from feed (id = " + no.localID + " )\n"
137                       
138                        logging.error(str(time.asctime()) + " Could not ingest: " + difFilename + " (id = " + no.localID + " )")
139                       
140               
141                self.record.append(str(time.asctime()) + reportLine)
142                       
143               
144                logging.info("---------------------------------------------------------------------------------------------------")
145                #logging.info(processingReport)       # dont need this
146                logging.info("Document " + no.localID)
147                logging.info("(report: " + reportLine + ")")
148                logging.info("---------------------------------------------------------------------------------------------------")
149               
150                #return record list of transactions
151               
152        def __setupFeedClient(self):
153                '''
154                Set up a feed client - to allow use of RESTful interface to retrieve
155                atoms referenced in the feed
156                '''
157                self.feedClient = feedClient("", eXistDBHostname = self.eXistDBHostname,
158                                                                         eXistPortNo = self.eXistPortNo)
159
160        def processFeed(self, feedUrl):
161                '''
162                Poll the atom feed and ingest any new data found
163                NB, this is an infinite loop to allow permanent polling
164                @param feedUrl: url to the feed to poll
165                '''
166               
167                #setup log file for checking history of ingests etc
168                #self.summaryFileName = "/home/badc/discovery_docs/ingestDocs/data/FeedIngestSummary.txt"       
169                self.summaryFileName = self.processingDict['reporting_directory'] + 'FeedIngestSummary.txt'
170                       
171                self.summaryFile = open(self.summaryFileName,'a')   
172               
173                self.record = []
174                 
175               
176                # NB, only check the feed every five minutes
177                if time.time() > self.refreshTime:
178                        logging.info("- polling for new data...")
179                        feed = feedparser.parse(feedUrl)
180                        # check for new entries
181                        newEntryNumber = len(feed.entries)
182                        if  newEntryNumber > self.entryNumber:
183                               
184                                logging.info("New data found - checking if this has been published after specified ingest date...")
185                               
186                                for i in range(self.entryNumber, newEntryNumber):
187                                        if self.__isEntryIngestible(feed.entries[i]):
188                                                browseURL = feed.entries[i].content[0].src
189                                                self.ingestDIFDocument(browseURL) 
190                       
191                                self.entryNumber = len(feed.entries)
192                               
193                        #write this runs entries to log file
194                        for line in self.record:                               
195                                self.summaryFile.write(line)
196                               
197                        self.refreshTime = time.time() + self.pollInterval
198                        logging.info("- finished polling for new data")
199
200       
201        def pollFeed(self, feedUrl):
202                '''
203                Poll the atom feed and ingest any new data found
204                NB, this is an infinite loop to allow permanent polling
205                @param feedUrl: url to the feed to poll
206                '''
207                self.refreshTime = 0.
208                self.entryNumber = 0
209
210                while (1 == 1):
211                        self.processFeed(feedUrl)
212
213
214        def __isEntryIngestible(self, entry):
215                '''
216                Check if a feed entry is valid for ingestion, return True, if so
217                - to be ingested, the entry must be later than the specified
218                'ingest from' date and must represent a DIF record
219                @param entry: feedparser.entry from a feed to check
220                @return True, if entry should be ingested, false otherwise
221                '''
222                # firstly, check the entry is a DIF record
223                if entry.title.startswith(feedClient.DIF_ENTRY_TITLE):
224                        logging.info("- entry represents a DIF record - now check its date")
225                                                       
226                        # now check the entry is published after the specified ingest date
227                        entryDate = datetime.datetime(*entry.updated_parsed[0:3])
228                        if self.ingestFromDate < entryDate:
229                                logging.info("- data is new - so valid for ingesting")
230                                return True
231                        else:
232                                logging.info("- data is before the ingest time period")
233                else:
234                        logging.info("- entry doesn't represent a DIF record")
235                logging.info("- data is not valid for ingesting")
236                return False
237
238
239        def usage(self):
240                '''
241                Display input params for the script
242                '''
243                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
244                print "Usage: python feeddocumentingester.py [OPTION] <feed> [interval=..], [ingestFromDate=..]"
245                print "              [eXistDBHostname=..], [eXistPortNo=..], [dataCentrePoll=..]"
246                print " - where:\n <feed> is the atom feed to ingest data from; options are:"
247                print " -v - verbose mode for output logging"
248                print " -d - debug mode for output logging"
249                print " \nand keywords are:"
250                print " interval - interval, in seconds, at which to retrieve data from the feed"
251                print " ingestFromDate - date, in format, 'YYYY-MM-DD', from which documents should be ingested - if not set, ingest date is taken as the current time"
252                print " eXistDBHostname - name of eXist DB to retrieve data from - NB, this will likely be where the feed is based, too - default is 'chinook.badc.rl.ac.uk'"
253                print " eXistPortNo - port number used by the eXist DB - defaults to '8080'"
254                print " dataCentrePoll - data centre whose documents should be polled for - e.g 'badc', 'neodc' - if not set, all documents on a feed will be ingested"
255                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
256                sys.exit(2)
257
258
259        def __init__(self, interval = None, ingestFromDate = None, 
260                                 eXistDBHostname = 'chinook.badc.rl.ac.uk', 
261                                 eXistPortNo = '8080',
262                                 configFileName = 'passwords.txt', 
263                                 dataCentrePoll = None):
264                '''
265                Set up object
266                @keyword interval: polling interval, in seconds, to set the feed polling to
267                - default is 300s
268                @keyword ingestFromDate: date from which to ingest docs - i.e. do not ingest
269                any feed entries before this date - in format, YYYY-MM-DD
270        @keyword eXistDBHostname: name of eXist DB to use
271        @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults
272        @keyword configFileName : password file to use - NB, this should
273        have contents of format (NB, space delimiter):
274        dbName userID password
275        Default is 'passwords.txt'.
276        to '8080' if not set
277        @keyword dataCentrePoll: to specify a datacentre to poll data for - e.g.
278        'badc' or 'neodc' - if not set, all data on the specified poll url will be retrieved
279                '''
280                self.dataCentre = None
281               
282                self.dataCentrePoll = dataCentrePoll
283               
284                if interval:
285                        self.setPollInterval(interval)
286                       
287                if ingestFromDate:
288                        self.setIngestFrom(ingestFromDate)
289               
290                self.eXistDBHostname = eXistDBHostname
291                self.eXistPortNo = eXistPortNo
292                self.configFileName = configFileName
293                self.refreshTime = 0.
294                self.entryNumber = 0
295                self._no_files_ingested = 0
296                self._no_problem_files = 0
297                self._error_messages = ''
298               
299                self.processThread = 'FEED'
300               
301                self.deName = VTD().TERM_DATA[VTD.DE_TERM].title
302
303                #self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
304               
305                #now use config info
306                self.processingDict = self.getProcessingConfig('oai_document_ingester.config')
307               
308                #self._code_dir = "/home/badc/buildouts/oai_document_ingester/ingestAutomation-upgrade/OAIBatch/" # this is the base dir that the script is ran from
309                #self._base_dir = "/home/badc/discovery_docs/ingestDocs/"
310               
311                self._code_dir = self.processingDict['code_directory']
312                self._base_dir = self.processingDict['base_directory']
313               
314                self._databaseConfigurationFile = self.processingDict['ingestConfig']
315                self._ndgRedirectURL = self.processingDict['NDG_redirect_URL']
316               
317               
318               
319        def runPoll(self, feedURL):
320                '''
321                Set up the feed poll and start it running
322                @param feedURL: url to poll
323                - NB, the feedURL will typically be something like
324                http://chinook.badc.rl.ac.uk:8080/exist/atom/content/db/DIF
325                '''
326                self.setupPoll()
327                self.pollFeed(feedURL)
328               
329
330        def setupPoll(self):
331                '''
332                Set up everything ready to run the polling client
333                '''
334                # NB, could get the docs via RESTful interface in feed client; currently
335                # doing this via document retrieve xmlrpc interface
336                #self.__setupFeedClient()
337                self._getPostgresDBConnection()
338                self.__getDBConnection(self.eXistDBHostname, self.configFileName)
339                self._setupXQueries()
340       
341if __name__=="__main__":
342        print "================================="
343        print "RUNNING: FeedDocumentIngester.py"
344        ingester = FeedDocumentIngester()
345        args = ingester._setupCmdLineOptions()
346        ingester.runPoll(args[0])
Note: See TracBrowser for help on using the repository browser.