Ticket #133: test_parallel_requests.2.py

File test_parallel_requests.2.py, 10.7 KB (added by barboni@…, 12 years ago)

new version with wcps support

Line 
1#!/usr/bin/python
2
3#############################################
4#Author: Damiano Barboni <barboni@meeo.it>
5#Version:
6#Description: Perform parallel read-only requests to RASDAMAN or PETASCOPE
7#Usage: ./test_parallel_request.py APP N
8# APP can be
9# "ras" for RASDAMAN
10# "wcs" for PETASCOPE wcs2.0
11# "wcps" for PETASCOPE wcps abstract syntax
12# N is the number of requests generated
13
14#Changelog: Tue Apr 17 12:35:20 CEST 2012
15# Submit requests also to wcps (both xml or abstract syntax)
16#############################################
17
18import httplib
19import urllib
20import sys
21import threading
22import time
23import commands
24import traceback
25
26####################################################
27######## global variables definition ###############
28####################################################
29
30WCS_host = "rasdaman"
31WCS_port = 8080
32WCS_url = "/petascope/wcs2?"
33WCS_service = "WCS"
34WCS_version = "2.0.0"
35
36WCPS_url = "/petascope/wcps"
37
38CoverageId = "NIR"
39
40Output_File_Path = "/home/damiano"
41
42#################################
43#global variables for subset/trim
44_min_x = 0
45_max_x = 200
46_min_y = 0
47_max_y = 200
48###############################
49######################################################
50
51class GetCoverage():
52 #########################################
53 ############## INIT CLASS ###############
54 #########################################
55 def __init__(self, **kw):
56 #initlog
57 self.Request = "GetCoverage"
58 self.service = WCS_service
59 self.url = WCS_url
60 self.subset = ""
61
62 #set coverage id
63 self.CoverageId = kw['CoverageId']
64
65 #set wcs version
66 self.version = WCS_version
67
68 #########################################
69 ############# SET SUBSET METHOD #########
70 #########################################
71 def setSubset(self, axis, minVal, maxVal):
72 #if the axis is different than x or y log a warning message becuse the axis was not tested
73 if axis.lower()!="x" and axis.lower()!="y" and axis.lower()!="t":
74 self.LOG.warning("%s the axis %s is different than x-y-t and was not tested" %(self.Request, axis))
75 #append new subset to self.subset string
76 if axis.lower()=="t":
77 self.subset += "subset%s=%s(%s,%s)&" %(axis.upper(), axis.lower(), minVal, maxVal)
78 else:#axis.lower()=="x" OR axis.lower()=="y"
79 self.subset += "subset%s=%s(%d,%d)&" %(axis.upper(), axis.lower(), minVal, maxVal)
80
81 return self.subset
82
83 #########################################
84 ############ Get data METHOD ############
85 #########################################
86
87 def getData(self, **kw):
88 #get host coverage id
89 host = kw['host']
90 port = kw['port']
91
92 #create string
93 request_string = WCS_url + "service=WCS" + "&Request=" + self.Request + "&version=" + self.version + "&CoverageId=" + self.CoverageId + "&" + self.subset
94
95 conn = WCSRequestGet(host, port)
96 xml = conn.execute(request_string)
97
98 return xml
99
100class WCPSClient():
101 #########################################
102 ############## INIT CLASS ###############
103 #########################################
104 def __init__(self, **kw):
105
106 #init subset
107 try:
108 self.subset = kw['subset']
109 except:
110 self.subset = {'x' : None,
111 'y' : None,
112 't' : None}
113
114 self.output = "csv"
115 self.CoverageId = kw['CoverageId']
116
117 #############################################
118 ############# WCPS QUERY GENERATION #########
119 #############################################
120 def queryGen(self):
121 self.query = ""
122 try:
123 #set coverage id
124 self.query += 'for data in (%s)' %(self.CoverageId)
125
126 #set encode
127 self.query += ' return encode( data'
128
129 ######################
130 # TRIM #
131 ######################
132 subset_query = ""
133 try:
134 if self.subset['x']:
135 subset_query += ' x:"CRS:1"(%s:%s),' %(str(self.subset['x']['min']), str(self.subset['x']['max']))
136 except:
137 None
138 try:
139 if self.subset['y']:
140 subset_query += ' y:"CRS:1"(%s:%s)' %(str(self.subset['y']['min']), str(self.subset['y']['max']))
141 except:
142 None
143
144 #if subset is define add the "trim" operator to the WCPS query
145 if subset_query:
146 self.query = self.query.replace("encode(", "encode( trim(")
147 subset_query = ', { ' + subset_query + ' })'
148
149 self.query += subset_query
150
151 #set output format
152 self.query += ', "%s")' %(self.output)
153
154 except:
155 None
156
157 return self.query
158
159 #############################################
160 ############# WCPS QUERY GENERATION #########
161 #############################################
162 def xmlGen(self):
163 #XML is not currently implemented
164 return ""
165
166 ##################################################
167 ############ Post request data METHOD ############
168 ##################################################
169
170 def postRequest(self, **kw):
171 #get host coverage id
172 host = kw['host']
173 port = kw['port']
174 url = WCPS_url
175 WCPS_type = kw['type']
176
177 #get ProcessCoverage request
178 if WCPS_type == "query":
179 WCPS_request = self.queryGen()
180
181 elif WCPS_type == "xml":
182 WCPS_request = self.xmlGen()
183
184 #submit request
185 try:
186 conn = WCPSRequestPost(host, port, url)
187 ProcessedCoverage = conn.execute(WCPS_type, WCPS_request)
188 except:
189 print ("Some errors occur retrivin ProcessedCoverage data.\n%s" %(traceback.format_exc()))
190
191 return ProcessedCoverage
192
193#implement http reguest to WCS server
194class WCSRequestGet():
195 def __init__(self, host, port = WCS_port):
196 self.host = host
197 self.port = port
198
199 def execute(self, wcs_request_string):
200 #init connection
201 conn = httplib.HTTPConnection(self.host+":"+str(self.port))
202 conn.request("GET", wcs_request_string)
203 r = conn.getresponse()
204
205 #check if response status is 200
206 r_status = r.status
207 if 200 <= r_status <= 299:
208 xml = r.read()
209 else:
210 xml = "ERROR %d" %(r_status)
211 print ("Unable to get data from WCS server. Bad response status N %d returned by WCS server" %(r_status))
212
213 conn.close()
214 return xml
215
216#implement http reguest POST to WCPS server
217class WCPSRequestPost():
218 def __init__(self, host, port, url=WCPS_url):
219 self.host = host
220 self.port = port
221 self.url = url
222
223 def execute(self, parameter, request_string):
224
225 #TAR.REP Request
226 headers = {"Content-type": "application/x-www-form-urlencoded",
227 "Accept": "text/plain"}
228
229 params = urllib.urlencode({parameter : request_string})
230 #established connection
231 conn = httplib.HTTPConnection(self.host, self.port)
232 conn.request("POST", self.url, params, headers)
233
234 #get data
235 response = conn.getresponse()
236 processedCoverage = response.read()
237 #close connection
238 conn.close()
239
240 return processedCoverage
241
242class requestThread(threading.Thread):
243 def __init__(self,application, i):
244 threading.Thread.__init__(self)
245 self.application = application
246 self.th_number = i
247
248 #run method
249 def run(self):
250 start = time.time()
251 print "Thread %d STARTED" %(self.th_number)
252 if self.application == "ras":
253 #request to rasdaman
254 rasdaman_request = 'rasql -q "select csv(n[%d:%d,%d:%d]) from NIR as n" --out string' %(_min_x, _max_x, _min_y, _max_y)
255 data_Out = str(commands.getstatusoutput(rasdaman_request))
256
257 filename = "%s/%s_output_%d.xml" %(Output_File_Path, self.application, self.th_number)
258
259 elif self.application == "wcs":
260 #request to petascope
261 obj = GetCoverage(CoverageId=CoverageId)
262 #set subset
263 obj.setSubset("x", _min_x, _max_x)
264 obj.setSubset("y", _min_y, _max_y)
265
266 #get xml response
267 data_Out = obj.getData(host=WCS_host, port=WCS_port)
268
269 filename = "%s/%s_output_%d.xml" %(Output_File_Path, self.application, self.th_number)
270
271 elif self.application == "wcps":
272 #request to petascope
273 obj = WCPSClient(CoverageId=CoverageId)
274
275 #set subset
276 obj.subset['x'] = {'min':_min_x, 'max':_max_x, 'EPSG':1}
277 obj.subset['y'] = {'min':_min_y, 'max':_max_y, 'EPSG':1}
278
279 #get xml response
280 data_Out = obj.postRequest(host=WCS_host, port=WCS_port, type="query")
281
282 filename = "%s/%s_output_%d.txt" %(Output_File_Path, self.application, self.th_number)
283
284 #write output to file
285 out_file = open(filename,"w")
286 out_file.write(data_Out)
287 out_file.close()
288
289 print "Thread %d ENDED in %f seconds" %(self.th_number,time.time()-start)
290 sys.exit(0)
291
292if __name__ == '__main__':
293 try:
294 application = sys.argv[1]
295 req_number = int(sys.argv[2],10)
296 if (application != "ras" or application != "wcs" or application != "wcps") and req_number < 1:
297 raise
298 except:
299 print "ERROR bad arguments. USAGE:\n\tpython %s APP N" %(sys.argv[0])
300 print '\t\tAPP can be:'
301 print '\t\t\t\t"ras" for RASDAMAN,'
302 print '\t\t\t\t"wcs" for PETASCOPE wcs2.0,'
303 print '\t\t\t\t"wcps" for PETASCOPE wcps abstract syntax\n'
304 print '\t\tN is the number of requests generated'
305 sys.exit(65)
306
307 #thread generation
308 thread_list = []
309 for i in range(0, req_number):
310 thread_list.append(requestThread(application, i))
311 #thread execution
312 thread_list[i].start()
313
314 #Wait for the thread_list tasks to finish
315 for i in range(0,len(thread_list)):
316 thread_list[i].join()
317
318 print "COMPLETE!"
319
320 sys.exit(0)
321