source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py @ 5585

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py@5585
Revision 5585, 10.5 KB checked in by sdonegan, 11 years ago (diff)

Updated feed handling to use new reporting methods as developed for conventional ingestion

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Class to ingest documents from the eXist DB - by polling its feed and doing
4 ingests when updates are detected
5 @author: C Byrom, Tessella Jan 09
6'''
7import os, sys, logging, time, feedparser, datetime
8from ndg.common.src.lib.utilities import formatDateYYYYMMDD, YEAR_FORMAT
9import ndg.common.src.lib.fileutilities as FileUtilities
10from ndg.common.src.clients.xmldb.eXist.feedclient import FeedClient as feedClient
11from abstractdocumentingester import AbstractDocumentIngester
12from ndg.common.src.models.vocabtermdata import VocabTermData as VTD
13from ndg.common.src.lib.atomutilities import *
14from ndg.common.src.models.ndgObject import ndgObject
15from ndg.common.src.clients.xmldb.eXist.searchclient import SearchClient
16import ndg.common.src.clients.xmldb.eXist.dbconstants as dc
17
18class FeedDocumentIngester(AbstractDocumentIngester):
19        '''
20        Class to handle the ingest of files from an eXist feed
21        '''
22        INTERVAL_ERROR = "Interval, '%s', is not a float or an int"
23       
24        # interval with which to check the feed for updates
25        pollInterval = 300.0
26       
27        # published time from which feed entries will be ingested
28        ingestFromDate = datetime.datetime.now()
29
30        # temp dir to create docs for running transforms on
31        TEMP_DIR_NAME = "feedPollTempDir"
32       
33        #needs this for compatibility with conventional ingestion
34        indFileToIngest=""
35
36        def __getDBConnection(self, dBHostname, configFileName):
37                '''
38                Get the default DB connection - by reading in data from the db config file
39                - to eXist DB to get feed docs from
40                @param eXistDBHostname: the hostname of the eXist DB to use for the ingest
41                @param configFileName: name of config file to get DB details from
42                '''
43                logging.info("Setting up connection to eXist DB")
44                self.dbClient = SearchClient(dbHostName = dBHostname,
45                                                                         configFileName = configFileName)
46                logging.info("Connection to eXist now set up")
47
48       
49        def setPollInterval(self, interval):
50                '''
51                Set the polling time interval
52                @param interval: an int or float representing the time between checking
53                the atom feed for updates
54                '''
55                if isinstance(interval, str) or isinstance(interval, unicode):
56                        interval = float(interval)
57                       
58                if isinstance(interval, int) or isinstance(interval, float):
59                        self.pollInterval = interval
60                else:
61                        raise ValueError(self.INTERVAL_ERROR %str(interval))
62
63       
64        def setIngestFromDate(self, ingestFromDate):
65                '''
66                Set the published date from which feed entries should be ingested
67                @param ingestFromDate: date from which documents should be ingested - in
68                format YYYY-MM-DD
69                '''
70                ingestDate = formatDateYYYYMMDD(ingestFromDate)
71                if ingestDate:
72                        self.ingestFromDate = datetime.datetime.strptime(ingestDate, 
73                                                                                                                         YEAR_FORMAT)
74
75
76        def ingestDIFDocument(self, browseURL):
77                '''
78                Retrieve the DIF specified at the browseURL, then ingest
79                the document into postgres
80        @param browseURL: url to retrieve doc from
81                '''
82                logging.info("Retrieving Atom document from url: '%s'" %browseURL)
83       
84                #if not self.feedClient:
85                #       self.__setupFeedClient()
86
87                # get the required info out of the URL
88                ndgURI = browseURL.split(VTD.BROWSE_STEM_URL)[-1]
89                no = ndgObject(ndgURI)
90               
91                # set up things for the provider of the data
92                dataCentre = no.repository.split('.')[0]
93               
94                # check if we're to ingest data from this data centre
95                if self.dataCentrePoll:
96                        if dataCentre != self.dataCentrePoll:
97                                logging.info("DIF doc is from a different data centre ('%s') to \
98                                        that being polled for ('%s') - skipping" %(dataCentre, self.dataCentrePoll))
99                                return
100               
101                # only bother reloading the data centre info, if we're really interested
102                if not self.dataCentre or (self.dataCentre and self.dataCentre != dataCentre):
103                        self.getConfigDetails(dataCentre)
104               
105                self.dataCentre = dataCentre
106                self._setupDataCentreDirs()
107               
108                doc = self.dbClient.getNDGDoc(no.repository, no.schema, no.localID,
109                                      targetCollection=dc.DIF_COLLECTION_PATH)
110
111                # create a temporary file to allow the ingest to do the various
112                # xquery transforms on
113                difFilename = self.originals_dir + no.localID + '.dif'
114                FileUtilities.createFile(difFilename, doc)
115
116                numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, self.dataCentre)
117               
118                #for feed ingester, just log report OP
119                logging.info("----------------------------------------------------------------")
120                logging.info(processReport)       
121                logging.info("Document fully ingested into discovery service")
122                logging.info("----------------------------------------------------------------")
123
124
125        def __setupFeedClient(self):
126                '''
127                Set up a feed client - to allow use of RESTful interface to retrieve
128                atoms referenced in the feed
129                '''
130                self.feedClient = feedClient("", eXistDBHostname = self.eXistDBHostname,
131                                                                         eXistPortNo = self.eXistPortNo)
132
133        def processFeed(self, feedUrl):
134                '''
135                Poll the atom feed and ingest any new data found
136                NB, this is an infinite loop to allow permanent polling
137                @param feedUrl: url to the feed to poll
138                '''
139                # NB, only check the feed every five minutes
140                if time.time() > self.refreshTime:
141                        logging.info("- polling for new data...")
142                        feed = feedparser.parse(feedUrl)
143                        # check for new entries
144                        newEntryNumber = len(feed.entries)
145                        if  newEntryNumber > self.entryNumber:
146                                logging.info("New data found - checking if this has been published after specified ingest date...")
147                               
148                                for i in range(self.entryNumber, newEntryNumber):
149                                        if self.__isEntryIngestible(feed.entries[i]):
150                                                browseURL = feed.entries[i].content[0].src
151                                                self.ingestDIFDocument(browseURL) 
152                       
153                                self.entryNumber = len(feed.entries)
154
155                        self.refreshTime = time.time() + self.pollInterval
156                        logging.info("- finished polling for new data")
157
158       
159        def pollFeed(self, feedUrl):
160                '''
161                Poll the atom feed and ingest any new data found
162                NB, this is an infinite loop to allow permanent polling
163                @param feedUrl: url to the feed to poll
164                '''
165                self.refreshTime = 0.
166                self.entryNumber = 0
167
168                logging.info("Start polling for atom feed updates")
169                while (1 == 1):
170                        self.processFeed(feedUrl)
171
172
173        def __isEntryIngestible(self, entry):
174                '''
175                Check if a feed entry is valid for ingestion, return True, if so
176                - to be ingested, the entry must be later than the specified
177                'ingest from' date and must represent a DIF record
178                @param entry: feedparser.entry from a feed to check
179                @return True, if entry should be ingested, false otherwise
180                '''
181                # firstly, check the entry is a DIF record
182                if entry.title.startswith(feedClient.DIF_ENTRY_TITLE):
183                        logging.info("- entry represents a DIF record - now check its date")
184                                                       
185                        # now check the entry is published after the specified ingest date
186                        entryDate = datetime.datetime(*entry.updated_parsed[0:3])
187                        if self.ingestFromDate < entryDate:
188                                logging.info("- data is new - so valid for ingesting")
189                                return True
190                        else:
191                                logging.info("- data is before the ingest time period")
192                else:
193                        logging.info("- entry doesn't represent a DIF record")
194                logging.info("- data is not valid for ingesting")
195                return False
196
197
198        def usage(self):
199                '''
200                Display input params for the script
201                '''
202                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
203                print "Usage: python feeddocumentingester.py [OPTION] <feed> [interval=..], [ingestFromDate=..]"
204                print "              [eXistDBHostname=..], [eXistPortNo=..], [dataCentrePoll=..]"
205                print " - where:\n <feed> is the atom feed to ingest data from; options are:"
206                print " -v - verbose mode for output logging"
207                print " -d - debug mode for output logging"
208                print " \nand keywords are:"
209                print " interval - interval, in seconds, at which to retrieve data from the feed"
210                print " ingestFromDate - date, in format, 'YYYY-MM-DD', from which documents should be ingested - if not set, ingest date is taken as the current time"
211                print " eXistDBHostname - name of eXist DB to retrieve data from - NB, this will likely be where the feed is based, too - default is 'chinook.badc.rl.ac.uk'"
212                print " eXistPortNo - port number used by the eXist DB - defaults to '8080'"
213                print " dataCentrePoll - data centre whose documents should be polled for - e.g 'badc', 'neodc' - if not set, all documents on a feed will be ingested"
214                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
215                sys.exit(2)
216
217
218        def __init__(self, interval = None, ingestFromDate = None, 
219                                 eXistDBHostname = 'chinook.badc.rl.ac.uk', 
220                                 eXistPortNo = '8080',
221                                 configFileName = 'passwords.txt', 
222                                 dataCentrePoll = None):
223                '''
224                Set up object
225                @keyword interval: polling interval, in seconds, to set the feed polling to
226                - default is 300s
227                @keyword ingestFromDate: date from which to ingest docs - i.e. do not ingest
228                any feed entries before this date - in format, YYYY-MM-DD
229        @keyword eXistDBHostname: name of eXist DB to use
230        @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults
231        @keyword configFileName : password file to use - NB, this should
232        have contents of format (NB, space delimiter):
233        dbName userID password
234        Default is 'passwords.txt'.
235        to '8080' if not set
236        @keyword dataCentrePoll: to specify a datacentre to poll data for - e.g.
237        'badc' or 'neodc' - if not set, all data on the specified poll url will be retrieved
238                '''
239                self.dataCentre = None
240               
241                self.dataCentrePoll = dataCentrePoll
242               
243                if interval:
244                        self.setPollInterval(interval)
245                       
246                if ingestFromDate:
247                        self.setIngestFrom(ingestFromDate)
248               
249                self.eXistDBHostname = eXistDBHostname
250                self.eXistPortNo = eXistPortNo
251                self.configFileName = configFileName
252                self.refreshTime = 0.
253                self.entryNumber = 0
254                self._no_files_ingested = 0
255                self._no_problem_files = 0
256                self._error_messages = ''
257               
258                self.deName = VTD().TERM_DATA[VTD.DE_TERM].title
259
260                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
261               
262               
263        def runPoll(self, feedURL):
264                '''
265                Set up the feed poll and start it running
266                @param feedURL: url to poll
267                - NB, the feedURL will typically be something like
268                http://chinook.badc.rl.ac.uk:8080/exist/atom/content/db/DIF
269                '''
270                self.setupPoll()
271                self.pollFeed(feedURL)
272               
273
274        def setupPoll(self):
275                '''
276                Set up everything ready to run the polling client
277                '''
278                # NB, could get the docs via RESTful interface in feed client; currently
279                # doing this via document retrieve xmlrpc interface
280                #self.__setupFeedClient()
281                self._getPostgresDBConnection()
282                self.__getDBConnection(self.eXistDBHostname, self.configFileName)
283                self._setupXQueries()
284       
285if __name__=="__main__":
286        print "================================="
287        print "RUNNING: FeedDocumentIngester.py"
288        ingester = FeedDocumentIngester()
289        args = ingester._setupCmdLineOptions()
290        ingester.runPoll(args[0])
Note: See TracBrowser for help on using the repository browser.