source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py @ 5600

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py@5600
Revision 5600, 11.7 KB checked in by sdonegan, 11 years ago (diff)

Debugged ATOM feed ingestion to deal with revised eXist atom architecture

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Class to ingest documents from the eXist DB - by polling its feed and doing
4 ingests when updates are detected
5 @author: C Byrom, Tessella Jan 09
6'''
7import os, sys, logging, time, feedparser, datetime
8
9#TODO take this out for production (fudge to get logging to work.)
10logging.basicConfig(level=logging.DEBUG,
11                    format='%(asctime)s %(filename)s:%(lineno)d %(levelname)s %(message)s')
12
13from ndg.common.src.lib.utilities import formatDateYYYYMMDD, YEAR_FORMAT
14import ndg.common.src.lib.fileutilities as FileUtilities
15from ndg.common.src.clients.xmldb.eXist.feedclient import FeedClient as feedClient
16from abstractdocumentingester import AbstractDocumentIngester
17from ndg.common.src.models.vocabtermdata import VocabTermData as VTD
18from ndg.common.src.lib.atomutilities import *
19from ndg.common.src.models.ndgObject import ndgObject
20from ndg.common.src.clients.xmldb.eXist.searchclient import SearchClient
21import ndg.common.src.clients.xmldb.eXist.dbconstants as dc
22
23class FeedDocumentIngester(AbstractDocumentIngester):
24        '''
25        Class to handle the ingest of files from an eXist feed
26        '''
27        INTERVAL_ERROR = "Interval, '%s', is not a float or an int"
28       
29        # interval with which to check the feed for updates
30        pollInterval = 300.0
31       
32        # published time from which feed entries will be ingested
33        ingestFromDate = datetime.datetime.now()
34
35        # temp dir to create docs for running transforms on
36        TEMP_DIR_NAME = "feedPollTempDir"
37       
38        #needs this for compatibility with conventional ingestion
39        indFileToIngest=""
40
41        def __getDBConnection(self, dBHostname, configFileName):
42                '''
43                Get the default DB connection - by reading in data from the db config file
44                - to eXist DB to get feed docs from
45                @param eXistDBHostname: the hostname of the eXist DB to use for the ingest
46                @param configFileName: name of config file to get DB details from
47                '''
48                logging.info("Setting up connection to eXist DB")
49                self.dbClient = SearchClient(dbHostName = dBHostname,
50                                                                         configFileName = configFileName)
51                logging.info("Connection to eXist now set up")
52
53       
54        def setPollInterval(self, interval):
55                '''
56                Set the polling time interval
57                @param interval: an int or float representing the time between checking
58                the atom feed for updates
59                '''
60                if isinstance(interval, str) or isinstance(interval, unicode):
61                        interval = float(interval)
62                       
63                if isinstance(interval, int) or isinstance(interval, float):
64                        self.pollInterval = interval
65                else:
66                        raise ValueError(self.INTERVAL_ERROR %str(interval))
67
68       
69        def setIngestFromDate(self, ingestFromDate):
70                '''
71                Set the published date from which feed entries should be ingested
72                @param ingestFromDate: date from which documents should be ingested - in
73                format YYYY-MM-DD
74                '''
75                ingestDate = formatDateYYYYMMDD(ingestFromDate)
76                if ingestDate:
77                        self.ingestFromDate = datetime.datetime.strptime(ingestDate, 
78                                                                                                                         YEAR_FORMAT)
79
80
81        def ingestDIFDocument(self, browseURL):
82                '''
83                Retrieve the DIF specified at the browseURL, then ingest
84                the document into postgres
85        @param browseURL: url to retrieve doc from
86                '''
87                logging.info("Retrieving Atom document from url: '%s'" %browseURL)
88               
89                self._no_files_ingested = 0
90                self._no_files_changed = 0
91                self._no_files_deleted = 0
92                self._no_problem_files = 0
93       
94                #if not self.feedClient:
95                #       self.__setupFeedClient()
96
97                # get the required info out of the URL
98                ndgURI = browseURL.split(VTD.BROWSE_STEM_URL)[-1]
99                no = ndgObject(ndgURI)
100               
101                # set up things for the provider of the data
102                dataCentre = no.repository.split('.')[0]
103               
104                # check if we're to ingest data from this data centre
105                if self.dataCentrePoll:
106                        if dataCentre != self.dataCentrePoll:
107                                logging.info("DIF doc is from a different data centre ('%s') to \
108                                        that being polled for ('%s') - skipping" %(dataCentre, self.dataCentrePoll))
109                                return
110               
111                # only bother reloading the data centre info, if we're really interested
112                if not self.dataCentre or (self.dataCentre and self.dataCentre != dataCentre):
113                        self.getConfigDetails(dataCentre)
114               
115                self.dataCentre = dataCentre
116                self._setupDataCentreDirs()
117               
118               
119                difFilename = self.originals_dir + no.localID + '.xml'         
120               
121                #SJD - encapsulate so catch errors - some problems with feeds.
122                try:
123                       
124                        doc = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + self.dbClient.getNDGDoc(no.repository, no.schema, no.localID,
125                                      targetCollection=dc.DIF_COLLECTION_PATH + no.repository)                 
126                       
127                        FileUtilities.createFile(difFilename, doc)                             
128                        numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, self.dataCentre, False)                           
129                        self.record.append(str(time.asctime()) + " Successfully obtained " + no.localID + " and ingested (" + str(numfilesproc) + ") document")
130                       
131                except:
132                        self.record.append(str(time.asctime()) + " Could not ingest: " + difFilename + " (id = " + no.localID + " )")   
133                        logging.error(str(time.asctime()) + " Could not ingest: " + difFilename + " (id = " + no.localID + " )" )
134                       
135                                       
136                #self.summaryFile.write(self.summaryLine)
137               
138                #for feed ingester, just log report OP
139                logging.info("----------------------------------------------------------------")
140                #logging.info(processingReport)       # dont need this
141                logging.info("Document " + no.localID + " fully ingested into discovery service")
142                logging.info("----------------------------------------------------------------")
143               
144                #return record list of transactions
145               
146        def __setupFeedClient(self):
147                '''
148                Set up a feed client - to allow use of RESTful interface to retrieve
149                atoms referenced in the feed
150                '''
151                self.feedClient = feedClient("", eXistDBHostname = self.eXistDBHostname,
152                                                                         eXistPortNo = self.eXistPortNo)
153
154        def processFeed(self, feedUrl):
155                '''
156                Poll the atom feed and ingest any new data found
157                NB, this is an infinite loop to allow permanent polling
158                @param feedUrl: url to the feed to poll
159                '''
160               
161                #setup log file for checking history of ingests etc
162                self.summaryFileName = "data/FeedIngestSummary.txt"       
163                self.summaryFile = open(self.summaryFileName,'a')   
164               
165                self.record = []
166                 
167               
168                # NB, only check the feed every five minutes
169                if time.time() > self.refreshTime:
170                        logging.info("- polling for new data...")
171                        feed = feedparser.parse(feedUrl)
172                        # check for new entries
173                        newEntryNumber = len(feed.entries)
174                        if  newEntryNumber > self.entryNumber:
175                               
176                                logging.info("New data found - checking if this has been published after specified ingest date...")
177                               
178                                for i in range(self.entryNumber, newEntryNumber):
179                                        if self.__isEntryIngestible(feed.entries[i]):
180                                                browseURL = feed.entries[i].content[0].src
181                                                self.ingestDIFDocument(browseURL) 
182                       
183                                self.entryNumber = len(feed.entries)
184                               
185                        #write this runs entries to log file
186                        for line in self.record:                               
187                                self.summaryFile.write(line)
188                               
189                        self.refreshTime = time.time() + self.pollInterval
190                        logging.info("- finished polling for new data")
191
192       
193        def pollFeed(self, feedUrl):
194                '''
195                Poll the atom feed and ingest any new data found
196                NB, this is an infinite loop to allow permanent polling
197                @param feedUrl: url to the feed to poll
198                '''
199                self.refreshTime = 0.
200                self.entryNumber = 0
201
202                while (1 == 1):
203                        self.processFeed(feedUrl)
204
205
206        def __isEntryIngestible(self, entry):
207                '''
208                Check if a feed entry is valid for ingestion, return True, if so
209                - to be ingested, the entry must be later than the specified
210                'ingest from' date and must represent a DIF record
211                @param entry: feedparser.entry from a feed to check
212                @return True, if entry should be ingested, false otherwise
213                '''
214                # firstly, check the entry is a DIF record
215                if entry.title.startswith(feedClient.DIF_ENTRY_TITLE):
216                        logging.info("- entry represents a DIF record - now check its date")
217                                                       
218                        # now check the entry is published after the specified ingest date
219                        entryDate = datetime.datetime(*entry.updated_parsed[0:3])
220                        if self.ingestFromDate < entryDate:
221                                logging.info("- data is new - so valid for ingesting")
222                                return True
223                        else:
224                                logging.info("- data is before the ingest time period")
225                else:
226                        logging.info("- entry doesn't represent a DIF record")
227                logging.info("- data is not valid for ingesting")
228                return False
229
230
231        def usage(self):
232                '''
233                Display input params for the script
234                '''
235                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
236                print "Usage: python feeddocumentingester.py [OPTION] <feed> [interval=..], [ingestFromDate=..]"
237                print "              [eXistDBHostname=..], [eXistPortNo=..], [dataCentrePoll=..]"
238                print " - where:\n <feed> is the atom feed to ingest data from; options are:"
239                print " -v - verbose mode for output logging"
240                print " -d - debug mode for output logging"
241                print " \nand keywords are:"
242                print " interval - interval, in seconds, at which to retrieve data from the feed"
243                print " ingestFromDate - date, in format, 'YYYY-MM-DD', from which documents should be ingested - if not set, ingest date is taken as the current time"
244                print " eXistDBHostname - name of eXist DB to retrieve data from - NB, this will likely be where the feed is based, too - default is 'chinook.badc.rl.ac.uk'"
245                print " eXistPortNo - port number used by the eXist DB - defaults to '8080'"
246                print " dataCentrePoll - data centre whose documents should be polled for - e.g 'badc', 'neodc' - if not set, all documents on a feed will be ingested"
247                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
248                sys.exit(2)
249
250
251        def __init__(self, interval = None, ingestFromDate = None, 
252                                 eXistDBHostname = 'chinook.badc.rl.ac.uk', 
253                                 eXistPortNo = '8080',
254                                 configFileName = 'passwords.txt', 
255                                 dataCentrePoll = None):
256                '''
257                Set up object
258                @keyword interval: polling interval, in seconds, to set the feed polling to
259                - default is 300s
260                @keyword ingestFromDate: date from which to ingest docs - i.e. do not ingest
261                any feed entries before this date - in format, YYYY-MM-DD
262        @keyword eXistDBHostname: name of eXist DB to use
263        @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults
264        @keyword configFileName : password file to use - NB, this should
265        have contents of format (NB, space delimiter):
266        dbName userID password
267        Default is 'passwords.txt'.
268        to '8080' if not set
269        @keyword dataCentrePoll: to specify a datacentre to poll data for - e.g.
270        'badc' or 'neodc' - if not set, all data on the specified poll url will be retrieved
271                '''
272                self.dataCentre = None
273               
274                self.dataCentrePoll = dataCentrePoll
275               
276                if interval:
277                        self.setPollInterval(interval)
278                       
279                if ingestFromDate:
280                        self.setIngestFrom(ingestFromDate)
281               
282                self.eXistDBHostname = eXistDBHostname
283                self.eXistPortNo = eXistPortNo
284                self.configFileName = configFileName
285                self.refreshTime = 0.
286                self.entryNumber = 0
287                self._no_files_ingested = 0
288                self._no_problem_files = 0
289                self._error_messages = ''
290               
291                self.deName = VTD().TERM_DATA[VTD.DE_TERM].title
292
293                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
294               
295               
296        def runPoll(self, feedURL):
297                '''
298                Set up the feed poll and start it running
299                @param feedURL: url to poll
300                - NB, the feedURL will typically be something like
301                http://chinook.badc.rl.ac.uk:8080/exist/atom/content/db/DIF
302                '''
303                self.setupPoll()
304                self.pollFeed(feedURL)
305               
306
307        def setupPoll(self):
308                '''
309                Set up everything ready to run the polling client
310                '''
311                # NB, could get the docs via RESTful interface in feed client; currently
312                # doing this via document retrieve xmlrpc interface
313                #self.__setupFeedClient()
314                self._getPostgresDBConnection()
315                self.__getDBConnection(self.eXistDBHostname, self.configFileName)
316                self._setupXQueries()
317       
318if __name__=="__main__":
319        print "================================="
320        print "RUNNING: FeedDocumentIngester.py"
321        ingester = FeedDocumentIngester()
322        args = ingester._setupCmdLineOptions()
323        ingester.runPoll(args[0])
Note: See TracBrowser for help on using the repository browser.