1 | #!/usr/bin/python
|
---|
2 |
|
---|
3 | #############################################
|
---|
4 | #Author: Damiano Barboni <barboni@meeo.it>
|
---|
5 | #Version:
|
---|
6 | #Description: Perform parallel read-only requests to RASDAMAN or PETASCOPE
|
---|
7 | #Usage: ./test_parallel_request.py APP N
|
---|
8 | # APP can be
|
---|
9 | # "ras" for RASDAMAN
|
---|
10 | # "wcs" for PETASCOPE wcs2.0
|
---|
11 | # "wcps" for PETASCOPE wcps abstract syntax
|
---|
12 | # N is the number of requests generated
|
---|
13 |
|
---|
14 | #Changelog: Tue Apr 17 12:35:20 CEST 2012
|
---|
15 | # Submit requests also to wcps (both xml or abstract syntax)
|
---|
16 | #############################################
|
---|
17 |
|
---|
18 | import httplib
|
---|
19 | import urllib
|
---|
20 | import sys
|
---|
21 | import threading
|
---|
22 | import time
|
---|
23 | import commands
|
---|
24 | import traceback
|
---|
25 |
|
---|
26 | ####################################################
|
---|
27 | ######## global variables definition ###############
|
---|
28 | ####################################################
|
---|
29 |
|
---|
30 | WCS_host = "rasdaman"
|
---|
31 | WCS_port = 8080
|
---|
32 | WCS_url = "/petascope/wcs2?"
|
---|
33 | WCS_service = "WCS"
|
---|
34 | WCS_version = "2.0.0"
|
---|
35 |
|
---|
36 | WCPS_url = "/petascope/wcps"
|
---|
37 |
|
---|
38 | CoverageId = "NIR"
|
---|
39 |
|
---|
40 | Output_File_Path = "/home/damiano"
|
---|
41 |
|
---|
42 | #################################
|
---|
43 | #global variables for subset/trim
|
---|
44 | _min_x = 0
|
---|
45 | _max_x = 200
|
---|
46 | _min_y = 0
|
---|
47 | _max_y = 200
|
---|
48 | ###############################
|
---|
49 | ######################################################
|
---|
50 |
|
---|
51 | class GetCoverage():
|
---|
52 | #########################################
|
---|
53 | ############## INIT CLASS ###############
|
---|
54 | #########################################
|
---|
55 | def __init__(self, **kw):
|
---|
56 | #initlog
|
---|
57 | self.Request = "GetCoverage"
|
---|
58 | self.service = WCS_service
|
---|
59 | self.url = WCS_url
|
---|
60 | self.subset = ""
|
---|
61 |
|
---|
62 | #set coverage id
|
---|
63 | self.CoverageId = kw['CoverageId']
|
---|
64 |
|
---|
65 | #set wcs version
|
---|
66 | self.version = WCS_version
|
---|
67 |
|
---|
68 | #########################################
|
---|
69 | ############# SET SUBSET METHOD #########
|
---|
70 | #########################################
|
---|
71 | def setSubset(self, axis, minVal, maxVal):
|
---|
72 | #if the axis is different than x or y log a warning message becuse the axis was not tested
|
---|
73 | if axis.lower()!="x" and axis.lower()!="y" and axis.lower()!="t":
|
---|
74 | self.LOG.warning("%s the axis %s is different than x-y-t and was not tested" %(self.Request, axis))
|
---|
75 | #append new subset to self.subset string
|
---|
76 | if axis.lower()=="t":
|
---|
77 | self.subset += "subset%s=%s(%s,%s)&" %(axis.upper(), axis.lower(), minVal, maxVal)
|
---|
78 | else:#axis.lower()=="x" OR axis.lower()=="y"
|
---|
79 | self.subset += "subset%s=%s(%d,%d)&" %(axis.upper(), axis.lower(), minVal, maxVal)
|
---|
80 |
|
---|
81 | return self.subset
|
---|
82 |
|
---|
83 | #########################################
|
---|
84 | ############ Get data METHOD ############
|
---|
85 | #########################################
|
---|
86 |
|
---|
87 | def getData(self, **kw):
|
---|
88 | #get host coverage id
|
---|
89 | host = kw['host']
|
---|
90 | port = kw['port']
|
---|
91 |
|
---|
92 | #create string
|
---|
93 | request_string = WCS_url + "service=WCS" + "&Request=" + self.Request + "&version=" + self.version + "&CoverageId=" + self.CoverageId + "&" + self.subset
|
---|
94 |
|
---|
95 | conn = WCSRequestGet(host, port)
|
---|
96 | xml = conn.execute(request_string)
|
---|
97 |
|
---|
98 | return xml
|
---|
99 |
|
---|
100 | class WCPSClient():
|
---|
101 | #########################################
|
---|
102 | ############## INIT CLASS ###############
|
---|
103 | #########################################
|
---|
104 | def __init__(self, **kw):
|
---|
105 |
|
---|
106 | #init subset
|
---|
107 | try:
|
---|
108 | self.subset = kw['subset']
|
---|
109 | except:
|
---|
110 | self.subset = {'x' : None,
|
---|
111 | 'y' : None,
|
---|
112 | 't' : None}
|
---|
113 |
|
---|
114 | self.output = "csv"
|
---|
115 | self.CoverageId = kw['CoverageId']
|
---|
116 |
|
---|
117 | #############################################
|
---|
118 | ############# WCPS QUERY GENERATION #########
|
---|
119 | #############################################
|
---|
120 | def queryGen(self):
|
---|
121 | self.query = ""
|
---|
122 | try:
|
---|
123 | #set coverage id
|
---|
124 | self.query += 'for data in (%s)' %(self.CoverageId)
|
---|
125 |
|
---|
126 | #set encode
|
---|
127 | self.query += ' return encode( data'
|
---|
128 |
|
---|
129 | ######################
|
---|
130 | # TRIM #
|
---|
131 | ######################
|
---|
132 | subset_query = ""
|
---|
133 | try:
|
---|
134 | if self.subset['x']:
|
---|
135 | subset_query += ' x:"CRS:1"(%s:%s),' %(str(self.subset['x']['min']), str(self.subset['x']['max']))
|
---|
136 | except:
|
---|
137 | None
|
---|
138 | try:
|
---|
139 | if self.subset['y']:
|
---|
140 | subset_query += ' y:"CRS:1"(%s:%s)' %(str(self.subset['y']['min']), str(self.subset['y']['max']))
|
---|
141 | except:
|
---|
142 | None
|
---|
143 |
|
---|
144 | #if subset is define add the "trim" operator to the WCPS query
|
---|
145 | if subset_query:
|
---|
146 | self.query = self.query.replace("encode(", "encode( trim(")
|
---|
147 | subset_query = ', { ' + subset_query + ' })'
|
---|
148 |
|
---|
149 | self.query += subset_query
|
---|
150 |
|
---|
151 | #set output format
|
---|
152 | self.query += ', "%s")' %(self.output)
|
---|
153 |
|
---|
154 | except:
|
---|
155 | None
|
---|
156 |
|
---|
157 | return self.query
|
---|
158 |
|
---|
159 | #############################################
|
---|
160 | ############# WCPS QUERY GENERATION #########
|
---|
161 | #############################################
|
---|
162 | def xmlGen(self):
|
---|
163 | #XML is not currently implemented
|
---|
164 | return ""
|
---|
165 |
|
---|
166 | ##################################################
|
---|
167 | ############ Post request data METHOD ############
|
---|
168 | ##################################################
|
---|
169 |
|
---|
170 | def postRequest(self, **kw):
|
---|
171 | #get host coverage id
|
---|
172 | host = kw['host']
|
---|
173 | port = kw['port']
|
---|
174 | url = WCPS_url
|
---|
175 | WCPS_type = kw['type']
|
---|
176 |
|
---|
177 | #get ProcessCoverage request
|
---|
178 | if WCPS_type == "query":
|
---|
179 | WCPS_request = self.queryGen()
|
---|
180 |
|
---|
181 | elif WCPS_type == "xml":
|
---|
182 | WCPS_request = self.xmlGen()
|
---|
183 |
|
---|
184 | #submit request
|
---|
185 | try:
|
---|
186 | conn = WCPSRequestPost(host, port, url)
|
---|
187 | ProcessedCoverage = conn.execute(WCPS_type, WCPS_request)
|
---|
188 | except:
|
---|
189 | print ("Some errors occur retrivin ProcessedCoverage data.\n%s" %(traceback.format_exc()))
|
---|
190 |
|
---|
191 | return ProcessedCoverage
|
---|
192 |
|
---|
193 | #implement http reguest to WCS server
|
---|
194 | class WCSRequestGet():
|
---|
195 | def __init__(self, host, port = WCS_port):
|
---|
196 | self.host = host
|
---|
197 | self.port = port
|
---|
198 |
|
---|
199 | def execute(self, wcs_request_string):
|
---|
200 | #init connection
|
---|
201 | conn = httplib.HTTPConnection(self.host+":"+str(self.port))
|
---|
202 | conn.request("GET", wcs_request_string)
|
---|
203 | r = conn.getresponse()
|
---|
204 |
|
---|
205 | #check if response status is 200
|
---|
206 | r_status = r.status
|
---|
207 | if 200 <= r_status <= 299:
|
---|
208 | xml = r.read()
|
---|
209 | else:
|
---|
210 | xml = "ERROR %d" %(r_status)
|
---|
211 | print ("Unable to get data from WCS server. Bad response status N %d returned by WCS server" %(r_status))
|
---|
212 |
|
---|
213 | conn.close()
|
---|
214 | return xml
|
---|
215 |
|
---|
216 | #implement http reguest POST to WCPS server
|
---|
217 | class WCPSRequestPost():
|
---|
218 | def __init__(self, host, port, url=WCPS_url):
|
---|
219 | self.host = host
|
---|
220 | self.port = port
|
---|
221 | self.url = url
|
---|
222 |
|
---|
223 | def execute(self, parameter, request_string):
|
---|
224 |
|
---|
225 | #TAR.REP Request
|
---|
226 | headers = {"Content-type": "application/x-www-form-urlencoded",
|
---|
227 | "Accept": "text/plain"}
|
---|
228 |
|
---|
229 | params = urllib.urlencode({parameter : request_string})
|
---|
230 | #established connection
|
---|
231 | conn = httplib.HTTPConnection(self.host, self.port)
|
---|
232 | conn.request("POST", self.url, params, headers)
|
---|
233 |
|
---|
234 | #get data
|
---|
235 | response = conn.getresponse()
|
---|
236 | processedCoverage = response.read()
|
---|
237 | #close connection
|
---|
238 | conn.close()
|
---|
239 |
|
---|
240 | return processedCoverage
|
---|
241 |
|
---|
242 | class requestThread(threading.Thread):
|
---|
243 | def __init__(self,application, i):
|
---|
244 | threading.Thread.__init__(self)
|
---|
245 | self.application = application
|
---|
246 | self.th_number = i
|
---|
247 |
|
---|
248 | #run method
|
---|
249 | def run(self):
|
---|
250 | start = time.time()
|
---|
251 | print "Thread %d STARTED" %(self.th_number)
|
---|
252 | if self.application == "ras":
|
---|
253 | #request to rasdaman
|
---|
254 | rasdaman_request = 'rasql -q "select csv(n[%d:%d,%d:%d]) from NIR as n" --out string' %(_min_x, _max_x, _min_y, _max_y)
|
---|
255 | data_Out = str(commands.getstatusoutput(rasdaman_request))
|
---|
256 |
|
---|
257 | filename = "%s/%s_output_%d.xml" %(Output_File_Path, self.application, self.th_number)
|
---|
258 |
|
---|
259 | elif self.application == "wcs":
|
---|
260 | #request to petascope
|
---|
261 | obj = GetCoverage(CoverageId=CoverageId)
|
---|
262 | #set subset
|
---|
263 | obj.setSubset("x", _min_x, _max_x)
|
---|
264 | obj.setSubset("y", _min_y, _max_y)
|
---|
265 |
|
---|
266 | #get xml response
|
---|
267 | data_Out = obj.getData(host=WCS_host, port=WCS_port)
|
---|
268 |
|
---|
269 | filename = "%s/%s_output_%d.xml" %(Output_File_Path, self.application, self.th_number)
|
---|
270 |
|
---|
271 | elif self.application == "wcps":
|
---|
272 | #request to petascope
|
---|
273 | obj = WCPSClient(CoverageId=CoverageId)
|
---|
274 |
|
---|
275 | #set subset
|
---|
276 | obj.subset['x'] = {'min':_min_x, 'max':_max_x, 'EPSG':1}
|
---|
277 | obj.subset['y'] = {'min':_min_y, 'max':_max_y, 'EPSG':1}
|
---|
278 |
|
---|
279 | #get xml response
|
---|
280 | data_Out = obj.postRequest(host=WCS_host, port=WCS_port, type="query")
|
---|
281 |
|
---|
282 | filename = "%s/%s_output_%d.txt" %(Output_File_Path, self.application, self.th_number)
|
---|
283 |
|
---|
284 | #write output to file
|
---|
285 | out_file = open(filename,"w")
|
---|
286 | out_file.write(data_Out)
|
---|
287 | out_file.close()
|
---|
288 |
|
---|
289 | print "Thread %d ENDED in %f seconds" %(self.th_number,time.time()-start)
|
---|
290 | sys.exit(0)
|
---|
291 |
|
---|
292 | if __name__ == '__main__':
|
---|
293 | try:
|
---|
294 | application = sys.argv[1]
|
---|
295 | req_number = int(sys.argv[2],10)
|
---|
296 | if (application != "ras" or application != "wcs" or application != "wcps") and req_number < 1:
|
---|
297 | raise
|
---|
298 | except:
|
---|
299 | print "ERROR bad arguments. USAGE:\n\tpython %s APP N" %(sys.argv[0])
|
---|
300 | print '\t\tAPP can be:'
|
---|
301 | print '\t\t\t\t"ras" for RASDAMAN,'
|
---|
302 | print '\t\t\t\t"wcs" for PETASCOPE wcs2.0,'
|
---|
303 | print '\t\t\t\t"wcps" for PETASCOPE wcps abstract syntax\n'
|
---|
304 | print '\t\tN is the number of requests generated'
|
---|
305 | sys.exit(65)
|
---|
306 |
|
---|
307 | #thread generation
|
---|
308 | thread_list = []
|
---|
309 | for i in range(0, req_number):
|
---|
310 | thread_list.append(requestThread(application, i))
|
---|
311 | #thread execution
|
---|
312 | thread_list[i].start()
|
---|
313 |
|
---|
314 | #Wait for the thread_list tasks to finish
|
---|
315 | for i in range(0,len(thread_list)):
|
---|
316 | thread_list[i].join()
|
---|
317 |
|
---|
318 | print "COMPLETE!"
|
---|
319 |
|
---|
320 | sys.exit(0)
|
---|
321 |
|
---|