source: cows_wps/trunk/cows_wps/model/orm/classes.py @ 5615

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/cows_wps/model/orm/classes.py@5615
Revision 5615, 3.6 KB checked in by spascoe, 11 years ago (diff)

COWS WPS package copied from
 http://proj.badc.rl.ac.uk/svn/dcip/cows-wps/trunk.

This is a stripped down version of the DDP WPS. Some features are
removed and others have been deactivated until we reimplement them in a
more generic way.

Line 
1"""
2Classes mapped to database tables.
3
4"""
5
6from sqlalchemy.orm import mapper, relation
7import datetime as dt
8import datetime
9import md5
10
11from cows_wps.process_handler.context.process_status import STATUS
12
13# Relative imports
14import tables
15
16class JOB_TYPE:
17    UNKNOWN = 0
18    IN_THREAD = 1
19    SCHEDULED = 2
20
21class Request(object):
22    """
23    @ivar id: The RequestID as given as a DataInputs parameter
24    @ivar user_id: The UserID as given as a DataInputs parameter
25    @ivar job: The job generated by this request
26    """
27   
28    def __init__(self, id, user_id):
29        self.id = id
30        self.user_id = user_id
31       
32    def __eq__(self, request):
33
34        if request.__class__ != Request:
35            return False
36
37        for attr in ['id', 'user_id', 'job_id', 'job']:
38            if getattr(self, attr) != getattr(request, attr):
39                return False
40       
41        return True
42       
43    def __ne__(self, request):
44        return not self == request
45   
46class Job(object):
47    """
48    @ivar status: job status from STATUS.*
49    @ivar process_dir: filesystem location of the process details. 
50        Note this is implemented in subclasses.
51    """
52   
53    def __init__(self, process, process_dir):
54        self.process = process
55        self.process_dir = process_dir
56        self.status = STATUS.NONE
57        self.created = dt.datetime.now()
58   
59    def updateStatus(self, status):
60        """
61        Set self.status and record self.last_polled as now.
62        """
63        self.status = status
64       
65    def __eq__(self, job):
66
67        if job.__class__ != self.__class__:
68            return False
69
70        for attr in ['process', 'process_dir', 'status']:
71            if getattr(self, attr) != getattr(job, attr):
72                return False
73       
74        return True
75       
76    def __ne__(self, job):
77        return not self == job             
78   
79               
80class SGEJob(Job):
81    def __init__(self, sge_id, queue, process, process_dir):
82        super(SGEJob, self).__init__(process, process_dir)
83        self.sge_id = sge_id
84        self.sge_queue = queue
85       
86    def updateStatus(self, status):
87        """
88        Set self.status and record self.last_polled as now.
89        """
90        self.status = status
91        self.last_polled = datetime.datetime.now()
92       
93    def __eq__(self, sgeJob):
94
95        if sgeJob.__class__ != SGEJob:
96            return False
97
98        for attr in ['sge_id', 'sge_queue', 'last_polled', 'process', 'process_dir', 'status']:
99            if getattr(self, attr) != getattr(sgeJob, attr):
100                return False
101       
102        return True
103       
104    def __ne__(self, sgeJob):
105        return not self == sgeJob
106           
107class InThreadJob(Job):
108    """So far no extra information needs storing for InThreadJob over what
109    is included in Job.
110    """
111    pass
112
113
114class CacheEntry(object):
115    def __init__(self, key, job):
116        self.key_hash = self.makeKeyHash(key)
117        self.key = key
118        self.job = job
119
120    @classmethod
121    def makeKeyHash(cls, key):
122        h = md5.md5()
123        h.update(key)
124        return h.digest()
125       
126
127#
128# Table mapping
129#
130mapper(Request, tables.request,
131       properties={'job': relation(Job, backref='requests')}
132)
133mapper(Job, tables.job, 
134       polymorphic_on=tables.job.c.type,
135       polymorphic_identity=JOB_TYPE.UNKNOWN,
136)
137mapper(SGEJob, tables.sge_job, 
138       inherits=Job,
139       polymorphic_identity=JOB_TYPE.SCHEDULED,
140)
141mapper(InThreadJob, tables.job,
142       inherits=Job,
143       polymorphic_identity=JOB_TYPE.IN_THREAD
144)
145mapper(CacheEntry, tables.cache,
146       properties={'job': relation(Job)}
147)
Note: See TracBrowser for help on using the repository browser.