source: cows_wps/trunk/cows_wps/lib/request.py @ 7095

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/cows_wps/lib/request.py@7095
Revision 7095, 26.5 KB checked in by astephen, 10 years ago (diff)

fixing up jobs page and moving renderer code into renderer package

Line 
1"""
2Routines for discovering running and previous jobs.
3
4Code from get_user_jobs_info and get_jobs_running is moving here minus
5the process wrapper and response encoding.
6
7"""
8import os
9import sys
10import time
11import logging
12import traceback
13
14log = logging.getLogger(__name__)
15
16from cows_wps.process_handler.context.process_context import ProcessContext
17from cows_wps.process_handler.context.process_status import STATUS
18
19import cows_wps.sge.job_queue as job_queue
20import cows_wps.sge.job_info as job_info
21import cows_wps.sge.sge_py_int as sge_py_interface
22import cows_wps.utils.common
23
24# sqlalchemy libs for connecting to the db
25from sqlalchemy.orm import eagerload, sessionmaker
26
27import sqlalchemy
28
29# Relative import
30from cows_wps.model.orm.tables import metadata, MAX_REQUEST_ID_LENGTH, MAX_USER_ID_LENGTH
31from cows_wps.model.orm.classes import *
32
33#import the wps config dict
34from cows_wps.utils.parse_wps_config import wps_config_dict
35
36class RequestManager(object):
37    """
38    @ivar statusChangeListeners: A list of callables that are called when a
39        job changes status with the signature f(job, old_status, session).  The listeners
40        are called within an active session.
41    """
42   
43    def __init__(self, engine):
44        self.engine = engine
45        self.sessionMaker = sessionmaker(bind=engine)
46        self.statusChangeListeners = []
47   
48    def getSession(self):
49        return self.sessionMaker()
50
51    def getRequest(self, requestId, sgeUpdate=True):
52        """
53        Retrieves a Request object from the database. If sgeUpdate is True and
54        the request corresponds to an active SGE process the filesystem will be
55        queried to try and obtain the most up to date status.
56
57        The returned Request object will be detached from the session but should have
58        its .job attribute fully populated.
59
60        Will raise a ValueError if the given requestId is not found.
61       
62        @type  requestId: string
63        @param requestId: the id of the request to be retrieved
64       
65        @type    sgeUpdate: bool
66        @keyword sgeUpdate: flag indicating if the status of a running SGE job
67            should attempt to be updated by quering the process directory. 
68           
69        @rtype: cows_wps.model.orm.classes.Request
70        @return: a request object from the database
71        """
72        session = self.getSession()
73       
74        try:
75            req = self._getRequest(requestId, session)
76           
77            if req is None:
78                raise ValueError('Request %s is not found' % requestId)             
79           
80            if sgeUpdate == True:
81                #if the request is async and not finished then update it
82                if req.job.status in [STATUS.ACCEPTED, STATUS.STARTED] and \
83                   req.job.type == JOB_TYPE.SCHEDULED:
84                   
85                    self._updateRunningSGEJobStatus(req.job, session)
86               
87            session.commit()
88           
89            # a hack to make sure that the sge_job is populated before
90            # the session is closed
91            self._populateJobOnRequest(req)
92           
93        finally:
94            session.close()
95               
96        return req
97   
98    def getProcessDirForRequest(self, requestId):
99        """
100        Returns the process directory ascociated with a request.
101       
102        @type  requestId: string
103        @param requestId: the request id of the request
104       
105        @rtype:  string
106        @return: the full path to the process directory
107        """
108       
109        req = self.getRequest(requestId, sgeUpdate=False)
110        return req.job.process_dir
111
112    def _getRequest(self, requestId, session):
113        """
114        Retrieves a request from an active session
115        """
116       
117        query = session.query(Request)
118        req = query.get(requestId)
119       
120        return req
121
122    def _populateJobOnRequest(self, req):
123        "Code to make sure a request's job is populated from the session"
124        a = req.job
125       
126        if req.job.__class__ == SGEJob:
127            # a hack to make sure that the sge_job is populated before
128            # the session is closed
129            a = req.job.sge_id       
130
131    def getAllRequest(self):
132
133        # because it seems impossible to get sqlalchemy to eagarload any properties from
134        # the sge_job table i've manually written a query to extract all the relevant data
135        # in one go this data can then be used to populate request object. This avoids the
136        # need to run another query on the database for each request returned.
137       
138        s = sqlalchemy.sql.text("""SELECT r.id, r.user_id, r.job_id, j.process, j.process_dir, j.status, j.type, j.created, sj.sge_id, sj.sge_queue, sj.last_polled
139                    FROM request as r
140                    JOIN job as j ON r.job_id = j.job_id
141                    LEFT JOIN sge_job as sj ON r.job_id = sj.job_id""")
142       
143        columns = ["id", "user_id", "job_id", "process", "process_dir", "status", "type", "created", "sge_id", "sge_queue", "last_polled"]
144       
145        conn = self.engine.connect()
146        try:
147            results =  conn.execute(s).fetchall()
148        finally:
149            conn.close()
150           
151        #build up a dictionary for each row
152        rows = []
153        for r in results:
154            d = {}
155            for i in range(len(r)):
156                d[columns[i]] = r[i]
157            rows.append(d)
158       
159        #manually build the request object from the results
160        requests = []
161        for r in rows:
162            if r['type'] == JOB_TYPE.SCHEDULED:
163                job = SGEJob(r['sge_id'], r['sge_queue'], r['process'], r['process_dir'])
164                job.last_polled = cows_wps.utils.common.stringToDateTime(r['last_polled'], includeMicroseconds=True)
165            else:
166                job = InThreadJob(r['process'], r['process_dir'])
167           
168            job.status = r['status']
169            job.type = r['type']
170            job.job_id = r['job_id']
171            job.created = r['created']
172           
173            req = Request(r['id'], r['user_id'])
174            req.job = job
175            req.job_id = r['job_id']
176           
177            requests.append(req)
178       
179        return requests
180
181    def isRequestPresent(self, requestId):
182        """
183        Returns a boolean indicating if a request with the given id id present in the currently
184        connected database.
185        """
186        session = self.getSession()
187       
188        try:
189            req = self._getRequest(requestId, session)
190        finally:
191            session.close()
192           
193        return req != None
194
195
196    def updateRequestStatus(self, requestId, status):
197        """
198        Updates the status of the job in the database with the given job id.
199       
200        Will raise a ValueError if the job is not found.
201        """
202        log.debug("Updating request %s status to %s" % (requestId, status))
203        session = self.getSession()
204        try:
205            job = self._getRequest(requestId, session).job
206            self._updateJobStatus(job, status, session)
207       
208            session.commit()
209        finally:
210            session.close()
211       
212    def _updateJobStatus(self, job, status, session):
213        """
214        Updates the status of a given Job object and attaches it to the given session
215
216        Will call any statusChangeListeners atached to the manager.
217        """       
218        old_status = job.status
219        job.updateStatus(status)
220        session.add(job)
221       
222        # Notify anything that want's to know status changes
223        for f in self.statusChangeListeners:
224            f(job, old_status, session)
225       
226           
227    def updateRunningRequests(self):
228        """
229        Scans through all running requests that may need their status updating.
230       
231        Async jobs will save their status to the filesystem but do not update
232        the database directly.  This method ensures the database is up to date.
233       
234        """
235        log.debug('BEGIN updateRunningRequests()')
236       
237        session = self.getSession()
238        try:
239            ret = []
240   
241            where = sqlalchemy.or_(SGEJob.status==STATUS.ACCEPTED, SGEJob.status==STATUS.STARTED)
242                   
243            query = session.query(SGEJob).filter(where)
244                   
245            for job in query:
246               
247                try:
248                    self._updateRunningSGEJobStatus(job, session)
249                except:
250                    log.exception("Exception occurred while updating job %s" % (job.job_id,))
251                   
252                    # set the job to failed in the database
253                    log.warning("Setting job %s to FAILED." % (job.job_id,))
254                   
255                    job.updateStatus(STATUS.FAILED)
256                                     
257            session.commit()
258        finally:
259            session.close()
260
261        log.debug('END updateRunningRequests()')
262       
263    def _updateRunningSGEJobStatus(self, job, session):
264        """
265        Checks the filesystem and the SGE for any updates on the status of a given job. If
266        a different status is found the job is updated in the database.
267
268        It is only worth calling this on jobs which are ACCEPTED or STARTED as once a job is
269        COMPLETED or FAILED then its status should never be changed again.
270        """       
271#        log.debug('Checking status of Job jobid=%d' % job.job_id)
272       
273        # A running job should only have one request associated with it!
274        if len(job.requests) != 1:
275            raise RuntimeError('Running job %d is associated with multiple requests %s' % (job.job_id,
276                                                                                           job.requests))
277        req = job.requests[0]
278       
279        statusInFile = self._getJobStatusFromFile(job.process_dir)
280        updateStatusTo = None
281
282        log.debug("status in DB = %s, status In File = %s" % (job.status, statusInFile))
283
284        if self.isJobActiveInSGE(job.job_id):
285           
286            #should the database status be updated to STARTED?
287            if statusInFile == STATUS.STARTED and statusInFile != job.status:
288                updateStatusTo = STATUS.STARTED
289               
290            # if the status is anything else, should wait for the process
291            # to finish running in SGE to react.
292           
293        else: #job is not in SGE
294           
295            #has the job finished
296            if statusInFile in [STATUS.COMPLETED, STATUS.FAILED]:
297                if job.status != statusInFile:
298                    updateStatusTo = statusInFile
299            else:
300                #has the job failed to start
301                if self._hasJobFailedToStart(job.job_id):
302                    msg = "Job %s process still in %s state but isn't being run or " + \
303                          "queued by SGE and timeout of %ss has passed"
304                    log.error(msg % (job.job_id, statusInFile ,wps_config_dict['sge_queue_timeout']))
305                    log.error(" ***** Setting status to FAILED for job id=%s ***** " % (job.job_id,))
306                    updateStatusTo = STATUS.FAILED
307                else:
308                    pass # the job is still starting up
309                                   
310
311        if updateStatusTo != None:
312            self._updateJobStatus(job, updateStatusTo, session)
313           
314            if updateStatusTo != statusInFile:
315                self._updateStatusInProcessDir(job.process_dir, updateStatusTo)
316               
317            log.debug('Status of SGEJob %d updated to %s' % (job.sge_id, updateStatusTo))
318   
319    def isJobActiveInSGE(self, jobId):
320        """
321        Returns a value indicating if the job with the given jobId is queued/running in SGE.
322       
323        If no sge job corresponding to the id given is found then a ValueError is raised.
324        """
325       
326        jobInfo = self._getSGEJobInfoForJob(jobId)
327       
328        if jobInfo == None:
329            return False # the job is not known by SGE
330        elif jobInfo.state in [job_info.SUNGRID_PROC_STATUS.DELETED_RUNNING,
331                               job_info.SUNGRID_PROC_STATUS.DELETED_TRANSFERING, 
332                               job_info.SUNGRID_PROC_STATUS.DELETION]:
333            # the job is soon to be or being deleted and so isn't active
334            return False 
335        else:
336            #The job is still active in SGE
337            return True
338       
339    def _getSGEJobInfoForJob(self, jobId):
340        """
341        Returns a SGE jobInfo object for a job with the given jobId.
342       
343        If the jobId doesn't correspond to a SGEJob a ValueError is raised.
344        """
345        session = self.getSession()
346        try:
347            job = session.query(Job).get(jobId)
348           
349            if job is None:
350                raise ValueError('Job %s is not found' % jobId)
351           
352            if not isinstance(job, SGEJob):
353                raise ValueError('JobID %d does not refer to an SGE job' % jobId)
354           
355            jobInfo = job_queue.getJobInfoById(job.sge_id)
356           
357        finally:
358            session.close()
359           
360        return jobInfo
361   
362    def _updateStatusInProcessDir(self, processDir, newStatus):
363           
364        context = self._getContextFromDir(processDir)
365       
366        if context.status != newStatus:
367            context.setStatus(newStatus, "Status set to %s" % newStatus)
368            context.save()
369           
370        context.close()                   
371       
372    def _getJobStatusFromFile(self, processDir):
373        """
374        Opens the process directory for a job and read the status from the file
375        """
376        context = self._getContextFromDir(processDir)
377        status = context.status
378        context.close()
379       
380        return status
381
382
383    def getJob(self, jobId):
384        """
385        Attempts to retrieve a Job object corresponding to the given jobId from the database.
386
387        Will raise a ValueError if no job is found.
388        """       
389        session = self.getSession()
390        try:
391            job = self._getJob(jobId, session)
392       
393            if job is None:
394                raise ValueError('Job %s is not found' % jobId)
395        finally: 
396            session.close()
397       
398        return job
399   
400
401    def _getJob(self, jobId, session):
402       
403        job = session.query(Job).get(jobId)
404        return job
405
406    def isJobPresent(self, jobId):
407       
408        session = self.getSession()
409       
410        try:
411            job = self._getJob(jobId, session)
412        finally:
413            session.close()
414           
415        return job != None       
416
417    def _hasJobFailedToStart(self, jobId):
418        """
419        Attempts to determine if a job has fialed to start or is in the process of startin up.
420
421        This is done by compairing the amount of time since the 'status.txt' file creation against
422        the 'sge_queue_timeout' wps config variable.
423        """
424        job = self.getJob(jobId)
425        statusFile = os.path.join(job.process_dir, 'status.txt')
426       
427        timeSinceCreation = time.time() - os.stat(statusFile).st_ctime
428        log.debug("time since status file creation = %s" % (timeSinceCreation,))
429        if timeSinceCreation > float(wps_config_dict['sge_queue_timeout']):
430            return True
431        else:
432            return False
433
434    def addAsyncRequest(self, requestId, username, sgeJobId, identifier, context, queue=""):
435        """
436        Creates new Request and SGEJob objects and then adds them to the database.
437
438        The length of the new requestId will be checked.
439
440        Any Exceptions occurring while trying to add the request will be logged but
441        won't be thrown.
442        """
443        if len(requestId) > MAX_REQUEST_ID_LENGTH:
444            raise ValueError('Maximum RequestID length is %d, %d given' % (MAX_REQUEST_ID_LENGTH, len(requestId)))
445
446        user_id = username ###self._getUsername(context)
447
448        session = self.getSession()
449        try:
450            req = Request(requestId, user_id)
451            req.job = SGEJob(sgeJobId,queue, identifier, context.processDir)
452            req.job.updateStatus(context.status)
453
454           
455            session.add(req)
456            session.commit()
457            session.refresh(req) # refresh the requests attributes
458            a = req.job.sge_id   # make sure that the sge_job is populated before the session is closed
459           
460            log.info("Added SGE Job (requestId=%s to database." % (requestId,) )
461           
462        except:
463            log.exception("Failed to add SGE job to the database with request id: %s and SGE id: %s" %(requestId, sgeJobId))
464            session.close()
465        finally:
466            session.close()
467       
468
469        return req
470               
471    def addSyncRequest(self, requestId, username, identifier, context):
472        """
473        Creates new Request and InThreadJob objects and then adds them to the database.
474
475        The length of the new requestId will be checked.
476
477        Any Exceptions occurring while trying to add the request will be logged but
478        won't be thrown.
479        """
480        if len(requestId) > MAX_REQUEST_ID_LENGTH:
481            raise ValueError('Maximum RequestID length is %d, request %s has length %d ' % (MAX_REQUEST_ID_LENGTH, requestId, len(requestId)))
482
483        user_id = username ###self._getUsername(context)
484       
485        session = self.getSession()
486        try:
487            req = Request(requestId, user_id)
488            req.job = InThreadJob(identifier, context.processDir)
489            req.job.status = context.status
490
491            session.add(req)
492            session.commit()
493            session.refresh(req)
494            a = req.job.job_id
495       
496        except Exception, e:
497            log.exception("Failed to add request to the database with request id: %s " %requestId)
498            session.close()
499        finally:
500            session.close()
501           
502        return req
503
504    def addCachedRequest(self, requestId, cJob, username):
505        """
506        Creates a new Request object pointing to the Job object provided and inserts
507        it into the database.
508        """       
509        session = self.getSession()
510        try:
511            req = Request(requestId, username)
512   
513            req.job = cJob
514            cJobId = cJob.job_id
515   
516            session.add(req)
517            session.commit()
518   
519            context = self._getContextFromDir(req.job.process_dir)
520        finally:
521            session.close()
522
523        return context
524   
525    def cancelRequest(self, requestId):
526        """
527        Attempts to cancel the Sun Grid Engine job corresponding to the request with id requestId.
528
529        This function assumes that the ascociated job is currently queued ro running in SGE.  If the
530        Job is queued then a message will be written to status.txt noting that the process was cancelled
531        if the job is running the onCancel handler will be left to tidy up the process.
532
533        Any Exceptions raised while trying to cancel a request will be logged and not thrown.
534       
535        @returns: a tuple made up of a boolean indicating if the cancelation was successfull and
536        a message detailing why the cancellation was successfull or not.
537        """       
538        result = ()
539       
540        session = self.getSession()
541               
542        try:
543            req = self._getRequest(requestId, session)
544           
545            if req == None:
546                result = (False, 'No job with request id:%s exists' % requestId)
547               
548            else:
549
550                job = req.job
551               
552                if job.__class__ == InThreadJob:
553                    result = (False, "Job asociated with request %s is an InThreadJob, can't cancel." % requestId)
554               
555                elif job_queue.getJobInfoById(job.sge_id) == None:
556                    result = (False, "Job with sgeJobId=%s not found in current SGE jobs" % job.sge_id)
557
558                else:
559   
560                    log.debug("job status before cancelling is %s" % (job.status,))
561                   
562                    result = sge_py_interface.cancelJob(job.sge_id)
563                    log.debug("canceled job (sgeJobId=%s)" % (job.sge_id,))
564                    result = (True, "Job terminated successfully")
565                                       
566                    # if the status is Accepted then there will be no
567                    # onCancel handler to add the log entries and change
568                    # status.txt so need to do it here
569                   
570                    context = self._getContextFromDir(job.process_dir)
571                    log.debug("reading context to check status after cancelling via sge_py_int")
572                   
573                    if context.status != STATUS.FAILED:
574                       
575                        context.setStatus(STATUS.FAILED, 'Process cancelled')
576                        context.logProcessCancelled()
577                        log.warning("Set status to FAILED after cancelling.")
578                        context.save()
579                   
580                    context.close()
581                   
582                    #update the status in the database
583                   
584                    job.updateStatus(STATUS.FAILED)
585                    session.commit()
586
587        except Exception, msg:
588           
589            log.exception(msg)
590            result = (False, msg)
591           
592        finally:
593            session.close()
594       
595        return result
596
597   
598    def countSGEJobsRunning(self, userId, queue):
599        """
600        Return the number or requests running on a certain SGE queue for a given user.
601       
602        """
603        session = self.getSession()
604        try:
605            query = session.query(SGEJob).join(Request).filter_by(user_id=userId)
606            query = query.filter(tables.sge_job.c.sge_queue == queue)
607           
608            where = sqlalchemy.or_(SGEJob.status==STATUS.ACCEPTED, SGEJob.status==STATUS.STARTED)
609           
610            query = query.filter(where)
611
612            c = query.distinct().count()
613        finally:
614            session.close()
615           
616        return c
617
618    def iterContextsByUser(self, userId, runningJobs=True, limit=None):
619        """
620        Iterate over all process context objects for a given user.  The context
621        is closed automatically on each iteration.
622       
623        @type    limit: integer (default None)
624        @keyword limit: limits the number of context objects returned, returns
625           the context objects for the most recently created jobs.
626       
627        """
628   
629        session = self.getSession()
630        try:
631           
632            query = session.query(Request).join(Job).filter(Request.user_id==userId)
633           
634            if runningJobs == True:
635                where = sqlalchemy.or_(Job.status==STATUS.ACCEPTED, Job.status==STATUS.STARTED)
636            else:
637                where = sqlalchemy.or_(Job.status==STATUS.FAILED, Job.status==STATUS.COMPLETED)
638           
639            query = query.filter(where)
640           
641            if limit != None:
642                query = query.order_by(Job.created.desc()).limit(limit)
643           
644            log.debug("query = %s" % (query,))
645
646            for req in query:
647
648                job = req.job
649
650                log.debug("returning context for job %s" % (job.job_id,))
651
652                try:
653                    c = self._getContextFromDir(job.process_dir)
654                except:
655                    log.exception('Failed to open ProcessContext from directory %s' % job.process_dir)
656                    continue
657
658                log.debug("Returning context for dir %s" % (job.process_dir,))
659
660                yield c
661                c.close()
662               
663        finally:
664            session.close()
665
666    def deleteRequest(self, requestId):
667       
668        session = self.getSession()
669
670        try:
671           
672            req = self._getRequest(requestId, session)
673           
674            if req == None:
675                raise ValueError("Cannot delete request with id = %s, no such request found." % requestId)
676            else:
677                session.delete(req)
678                   
679            job = req.job
680           
681            #if the job has no requests then delete it
682            if len(job.requests) == 0:
683                session.delete(job)
684           
685            session.commit()
686        finally:
687            session.close()
688               
689    def deleteJob(self, jobId):
690       
691        session = self.getSession()
692        try:
693            job = self._getJob(jobId, session)
694           
695            if job == None:
696                raise ValueError("Cannot delete job with id = %s, no such job found." % jobId)
697
698            for req in job.requests:
699                session.delete(req)
700           
701            for cacheEntry in self._getCacheEntriesForJob(job, session):
702                session.delete(cacheEntry)
703           
704            session.delete(job)
705           
706            session.commit()
707        finally:
708            session.close()
709           
710    def _getCacheEntriesForJob(self, job, session):
711       
712        query = session.query(CacheEntry).filter(CacheEntry.job_id == job.job_id)
713        return query.all()
714       
715
716    def _logRequests(self):
717        """
718        Writes the details of all the requests in the database to the log.
719       
720        This is usually only usfull while running tests as the output can get
721        very long.
722        """       
723        session = self.getSession()
724        try:
725            str = "current requests:\n"
726            reqCount = 0
727            for req in session.query(Request).all():
728                reqCount += 1
729                str +="    req.id=%s, req.job_id=%s, req.job.status=%s req.user_id=%s\n" \
730                         % (req.id, req.job.job_id, req.job.status, req.user_id)
731                         
732            str += "%s requests found\n" % (reqCount,)
733           
734            log.debug(str)
735        finally:
736            session.close() 
737
738    def DEPRECATED_getUsername(self, context):
739        """
740        Extracts the 'Username' value from the context.inputs dictionary, if no entry is found then
741        a ValueError is raised.
742       
743        The username is also checked against the MAX_USER_ID_LENGTH value before being returned.
744        """       
745        if context.inputs.has_key('Username'):
746            user_id = context.inputs['Username']
747        else:
748            raise ValueError('Username not specified')
749
750        if len(user_id) > MAX_USER_ID_LENGTH:
751            raise ValueError('Maximum Username length is %d, %d set in context' % (MAX_USER_ID_LENGTH, len(user_id)))
752
753        return user_id
754
755    def _getContextFromDir(self, processDir):
756        return ProcessContext(processDir).open()
757
758if __name__ == '__main__':
759    jm = JobManager('/home/users/pnorton/svn/wps-ng/proc_outputs')
760    print jm.getJobStatus('job1')
Note: See TracBrowser for help on using the repository browser.