source: cows_wps/trunk/cows_wps/tests/test_process_code/wait_for_all_files_to_be_deleted.py @ 7014

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg/cows_wps/trunk/cows_wps/tests/test_process_code/wait_for_all_files_to_be_deleted.py@7014
Revision 7014, 3.5 KB checked in by astephen, 10 years ago (diff)

renamed ddp_process_support to process_support

Line 
1#!/usr/bin/env python
2#$ -S /usr/bin/python
3
4import sys
5import os
6import time
7import threading
8import logging
9from datetime import datetime
10
11
12import cows_wps.process_handler.process_support as process_support
13import cows_wps.tests.test_process_code.wait_for_deletion as wait_for_deletion
14from cows_wps.process_handler.fileset import FileSet
15from cows_wps.process_handler.context.process_status import STATUS
16
17FILE_LIST = ['file_%02i' % (i,) for i in range (1,6)]
18PROGRESS_FILE_NAME = "prog.txt"
19PROGRESS_BEFORE_RUN = 0.0
20PROGRESS_AFTER_RUN = 100.0
21
22
23class WaitForAllFilesToBeDeleted(object):
24
25    def __init__(self):
26        self._progress = 0.0
27        self.progressFileName = PROGRESS_FILE_NAME
28       
29        self.progressBeforeRunModule = PROGRESS_BEFORE_RUN
30        self.progressAfterRunModule = PROGRESS_AFTER_RUN
31
32    def __call__(self, context):
33       
34        startTime = time.time()
35       
36        filesRemaining = FILE_LIST
37       
38        self.log = context.log
39        self.log.info("Started")
40       
41        self.processDir = context.processDir
42
43        context.setStatus(STATUS.STARTED, "Waiting...", self.progressBeforeRunModule)
44       
45        filePathList = []
46        for f in FILE_LIST:
47            path = os.path.join(self.processDir, f)
48            self._createFile(path)
49            filePathList.append(path)
50       
51        progressFilePath = os.path.join(self.processDir, self.progressFileName)
52       
53        progReportFn = self.makeProgressReportFn(context)
54       
55        self.log.info("USE_PROGRESS_FILE = %s" % (USE_PROGRESS_FILE,))
56           
57        self.log.info("Using report function method")
58        wait_for_deletion.waitForFilesToBeDeletedWithReportFn(filePathList, progReportFn)
59           
60           
61        assert context.percentComplete == self.progressAfterRunModule, "Context percent complete isn't %s but %s" % (self.progressAfterRunModule, context.percentComplete)
62
63        context.setStatus(STATUS.COMPLETED, "Process is complete")       
64        completionTime = time.time()
65        process_support.updateJobDetailsAfterCompletion(context, startTime, completionTime)
66
67
68
69    def makeProgressReportFn(self, context):
70       
71        scale = (self.progressAfterRunModule - self.progressBeforeRunModule) / 100.0
72       
73        def reportProgress(progress):
74            self.log.info("Progress report function recieved %s" % progress)
75            context.setStatus(STATUS.STARTED, "Waiting...", self.progressBeforeRunModule + scale* progress)
76            context.save()
77       
78        return reportProgress
79
80    def _createFile(self, fullPath):
81        """
82        Creates the file that this process will monitor in the directory given,
83        the file created will be left empty.
84        """
85        fout = open(fullPath, 'w')
86        fout.close()
87        assert(os.path.exists(fullPath))
88        return fullPath       
89
90
91    def dryRun(self, context):
92       
93        context.log.info('WaitForAllFilesToBeDeleted dryrun !')
94       
95        estimatedDuration = context.inputs['EstimatedDuration']
96        context.outputs['job_details']['estimated_duration'] = estimatedDuration
97
98        context.outputs['FileSet'] = FileSet()       
99
100        context.setStatus(STATUS.ACCEPTED, "Process execution request has been accepted", 0)
101        process_support.updateJobDetailsAfterDryRun(context, estimatedDuration)
102
103   
104    def onProcessQuit(self):
105        quitDateTime = datetime.fromtimestamp(time.time())
106        self.log.warning("onProcessQuit called at %s" % (quitDateTime.isoformat(' '),))
107        sys.stdout.flush()
Note: See TracBrowser for help on using the repository browser.