source: TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py @ 5586

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/TI01-discovery/branches/ingestAutomation-upgrade/OAIBatch/feeddocumentingester.py@5586
Revision 5586, 10.6 KB checked in by sdonegan, 10 years ago (diff)

Debugging integreattion

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2'''
3 Class to ingest documents from the eXist DB - by polling its feed and doing
4 ingests when updates are detected
5 @author: C Byrom, Tessella Jan 09
6'''
7import os, sys, logging, time, feedparser, datetime
8from ndg.common.src.lib.utilities import formatDateYYYYMMDD, YEAR_FORMAT
9import ndg.common.src.lib.fileutilities as FileUtilities
10from ndg.common.src.clients.xmldb.eXist.feedclient import FeedClient as feedClient
11from abstractdocumentingester import AbstractDocumentIngester
12from ndg.common.src.models.vocabtermdata import VocabTermData as VTD
13from ndg.common.src.lib.atomutilities import *
14from ndg.common.src.models.ndgObject import ndgObject
15from ndg.common.src.clients.xmldb.eXist.searchclient import SearchClient
16import ndg.common.src.clients.xmldb.eXist.dbconstants as dc
17
18class FeedDocumentIngester(AbstractDocumentIngester):
19        '''
20        Class to handle the ingest of files from an eXist feed
21        '''
22        INTERVAL_ERROR = "Interval, '%s', is not a float or an int"
23       
24        # interval with which to check the feed for updates
25        pollInterval = 300.0
26       
27        # published time from which feed entries will be ingested
28        ingestFromDate = datetime.datetime.now()
29
30        # temp dir to create docs for running transforms on
31        TEMP_DIR_NAME = "feedPollTempDir"
32       
33        #needs this for compatibility with conventional ingestion
34        indFileToIngest=""
35
36        def __getDBConnection(self, dBHostname, configFileName):
37                '''
38                Get the default DB connection - by reading in data from the db config file
39                - to eXist DB to get feed docs from
40                @param eXistDBHostname: the hostname of the eXist DB to use for the ingest
41                @param configFileName: name of config file to get DB details from
42                '''
43                logging.info("Setting up connection to eXist DB")
44                self.dbClient = SearchClient(dbHostName = dBHostname,
45                                                                         configFileName = configFileName)
46                logging.info("Connection to eXist now set up")
47
48       
49        def setPollInterval(self, interval):
50                '''
51                Set the polling time interval
52                @param interval: an int or float representing the time between checking
53                the atom feed for updates
54                '''
55                if isinstance(interval, str) or isinstance(interval, unicode):
56                        interval = float(interval)
57                       
58                if isinstance(interval, int) or isinstance(interval, float):
59                        self.pollInterval = interval
60                else:
61                        raise ValueError(self.INTERVAL_ERROR %str(interval))
62
63       
64        def setIngestFromDate(self, ingestFromDate):
65                '''
66                Set the published date from which feed entries should be ingested
67                @param ingestFromDate: date from which documents should be ingested - in
68                format YYYY-MM-DD
69                '''
70                ingestDate = formatDateYYYYMMDD(ingestFromDate)
71                if ingestDate:
72                        self.ingestFromDate = datetime.datetime.strptime(ingestDate, 
73                                                                                                                         YEAR_FORMAT)
74
75
76        def ingestDIFDocument(self, browseURL):
77                '''
78                Retrieve the DIF specified at the browseURL, then ingest
79                the document into postgres
80        @param browseURL: url to retrieve doc from
81                '''
82                logging.info("Retrieving Atom document from url: '%s'" %browseURL)
83               
84                self._no_files_ingested = 0
85                self._no_files_changed = 0
86                self._no_files_deleted = 0
87                self._no_problem_files = 0
88       
89                #if not self.feedClient:
90                #       self.__setupFeedClient()
91
92                # get the required info out of the URL
93                ndgURI = browseURL.split(VTD.BROWSE_STEM_URL)[-1]
94                no = ndgObject(ndgURI)
95               
96                # set up things for the provider of the data
97                dataCentre = no.repository.split('.')[0]
98               
99                # check if we're to ingest data from this data centre
100                if self.dataCentrePoll:
101                        if dataCentre != self.dataCentrePoll:
102                                logging.info("DIF doc is from a different data centre ('%s') to \
103                                        that being polled for ('%s') - skipping" %(dataCentre, self.dataCentrePoll))
104                                return
105               
106                # only bother reloading the data centre info, if we're really interested
107                if not self.dataCentre or (self.dataCentre and self.dataCentre != dataCentre):
108                        self.getConfigDetails(dataCentre)
109               
110                self.dataCentre = dataCentre
111                self._setupDataCentreDirs()
112               
113                doc = self.dbClient.getNDGDoc(no.repository, no.schema, no.localID,
114                                      targetCollection=dc.DIF_COLLECTION_PATH)
115
116                # create a temporary file to allow the ingest to do the various
117                # xquery transforms on
118                difFilename = self.originals_dir + no.localID + '.dif'
119                FileUtilities.createFile(difFilename, doc)
120
121                numfilesproc, processingReport = self._convertAndIngestFiles(self.originals_dir, self.discovery_dir, self.dataCentre)
122               
123                #for feed ingester, just log report OP
124                logging.info("----------------------------------------------------------------")
125                logging.info(processReport)       
126                logging.info("Document fully ingested into discovery service")
127                logging.info("----------------------------------------------------------------")
128
129
130        def __setupFeedClient(self):
131                '''
132                Set up a feed client - to allow use of RESTful interface to retrieve
133                atoms referenced in the feed
134                '''
135                self.feedClient = feedClient("", eXistDBHostname = self.eXistDBHostname,
136                                                                         eXistPortNo = self.eXistPortNo)
137
138        def processFeed(self, feedUrl):
139                '''
140                Poll the atom feed and ingest any new data found
141                NB, this is an infinite loop to allow permanent polling
142                @param feedUrl: url to the feed to poll
143                '''
144                # NB, only check the feed every five minutes
145                if time.time() > self.refreshTime:
146                        logging.info("- polling for new data...")
147                        feed = feedparser.parse(feedUrl)
148                        # check for new entries
149                        newEntryNumber = len(feed.entries)
150                        if  newEntryNumber > self.entryNumber:
151                                logging.info("New data found - checking if this has been published after specified ingest date...")
152                               
153                                for i in range(self.entryNumber, newEntryNumber):
154                                        if self.__isEntryIngestible(feed.entries[i]):
155                                                browseURL = feed.entries[i].content[0].src
156                                                self.ingestDIFDocument(browseURL) 
157                       
158                                self.entryNumber = len(feed.entries)
159
160                        self.refreshTime = time.time() + self.pollInterval
161                        logging.info("- finished polling for new data")
162
163       
164        def pollFeed(self, feedUrl):
165                '''
166                Poll the atom feed and ingest any new data found
167                NB, this is an infinite loop to allow permanent polling
168                @param feedUrl: url to the feed to poll
169                '''
170                self.refreshTime = 0.
171                self.entryNumber = 0
172
173                logging.info("Start polling for atom feed updates")
174                while (1 == 1):
175                        self.processFeed(feedUrl)
176
177
178        def __isEntryIngestible(self, entry):
179                '''
180                Check if a feed entry is valid for ingestion, return True, if so
181                - to be ingested, the entry must be later than the specified
182                'ingest from' date and must represent a DIF record
183                @param entry: feedparser.entry from a feed to check
184                @return True, if entry should be ingested, false otherwise
185                '''
186                # firstly, check the entry is a DIF record
187                if entry.title.startswith(feedClient.DIF_ENTRY_TITLE):
188                        logging.info("- entry represents a DIF record - now check its date")
189                                                       
190                        # now check the entry is published after the specified ingest date
191                        entryDate = datetime.datetime(*entry.updated_parsed[0:3])
192                        if self.ingestFromDate < entryDate:
193                                logging.info("- data is new - so valid for ingesting")
194                                return True
195                        else:
196                                logging.info("- data is before the ingest time period")
197                else:
198                        logging.info("- entry doesn't represent a DIF record")
199                logging.info("- data is not valid for ingesting")
200                return False
201
202
203        def usage(self):
204                '''
205                Display input params for the script
206                '''
207                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
208                print "Usage: python feeddocumentingester.py [OPTION] <feed> [interval=..], [ingestFromDate=..]"
209                print "              [eXistDBHostname=..], [eXistPortNo=..], [dataCentrePoll=..]"
210                print " - where:\n <feed> is the atom feed to ingest data from; options are:"
211                print " -v - verbose mode for output logging"
212                print " -d - debug mode for output logging"
213                print " \nand keywords are:"
214                print " interval - interval, in seconds, at which to retrieve data from the feed"
215                print " ingestFromDate - date, in format, 'YYYY-MM-DD', from which documents should be ingested - if not set, ingest date is taken as the current time"
216                print " eXistDBHostname - name of eXist DB to retrieve data from - NB, this will likely be where the feed is based, too - default is 'chinook.badc.rl.ac.uk'"
217                print " eXistPortNo - port number used by the eXist DB - defaults to '8080'"
218                print " dataCentrePoll - data centre whose documents should be polled for - e.g 'badc', 'neodc' - if not set, all documents on a feed will be ingested"
219                print "++++++++++++++++++++++++++++++++++++++++++++++++++++" 
220                sys.exit(2)
221
222
223        def __init__(self, interval = None, ingestFromDate = None, 
224                                 eXistDBHostname = 'chinook.badc.rl.ac.uk', 
225                                 eXistPortNo = '8080',
226                                 configFileName = 'passwords.txt', 
227                                 dataCentrePoll = None):
228                '''
229                Set up object
230                @keyword interval: polling interval, in seconds, to set the feed polling to
231                - default is 300s
232                @keyword ingestFromDate: date from which to ingest docs - i.e. do not ingest
233                any feed entries before this date - in format, YYYY-MM-DD
234        @keyword eXistDBHostname: name of eXist DB to use
235        @keyword eXistPortNo: Port number that the eXist DB is exposed by - defaults
236        @keyword configFileName : password file to use - NB, this should
237        have contents of format (NB, space delimiter):
238        dbName userID password
239        Default is 'passwords.txt'.
240        to '8080' if not set
241        @keyword dataCentrePoll: to specify a datacentre to poll data for - e.g.
242        'badc' or 'neodc' - if not set, all data on the specified poll url will be retrieved
243                '''
244                self.dataCentre = None
245               
246                self.dataCentrePoll = dataCentrePoll
247               
248                if interval:
249                        self.setPollInterval(interval)
250                       
251                if ingestFromDate:
252                        self.setIngestFrom(ingestFromDate)
253               
254                self.eXistDBHostname = eXistDBHostname
255                self.eXistPortNo = eXistPortNo
256                self.configFileName = configFileName
257                self.refreshTime = 0.
258                self.entryNumber = 0
259                self._no_files_ingested = 0
260                self._no_problem_files = 0
261                self._error_messages = ''
262               
263                self.deName = VTD().TERM_DATA[VTD.DE_TERM].title
264
265                self._base_dir = os.getcwd() + "/" # this is the base dir that the script is ran from
266               
267               
268        def runPoll(self, feedURL):
269                '''
270                Set up the feed poll and start it running
271                @param feedURL: url to poll
272                - NB, the feedURL will typically be something like
273                http://chinook.badc.rl.ac.uk:8080/exist/atom/content/db/DIF
274                '''
275                self.setupPoll()
276                self.pollFeed(feedURL)
277               
278
279        def setupPoll(self):
280                '''
281                Set up everything ready to run the polling client
282                '''
283                # NB, could get the docs via RESTful interface in feed client; currently
284                # doing this via document retrieve xmlrpc interface
285                #self.__setupFeedClient()
286                self._getPostgresDBConnection()
287                self.__getDBConnection(self.eXistDBHostname, self.configFileName)
288                self._setupXQueries()
289       
290if __name__=="__main__":
291        print "================================="
292        print "RUNNING: FeedDocumentIngester.py"
293        ingester = FeedDocumentIngester()
294        args = ingester._setupCmdLineOptions()
295        ingester.runPoll(args[0])
Note: See TracBrowser for help on using the repository browser.