| 1 | #!/usr/bin/python
|
|---|
| 2 |
|
|---|
| 3 | #############################################
|
|---|
| 4 | #Author: Damiano Barboni <barboni@meeo.it>
|
|---|
| 5 | #Version:
|
|---|
| 6 | #Description: Perform parallel read-only requests to RASDAMAN or PETASCOPE
|
|---|
| 7 | #Usage: ./test_parallel_request.py APP N
|
|---|
| 8 | # APP can be
|
|---|
| 9 | # "ras" for RASDAMAN
|
|---|
| 10 | # "wcs" for PETASCOPE wcs2.0
|
|---|
| 11 | # "wcps" for PETASCOPE wcps abstract syntax
|
|---|
| 12 | # N is the number of requests generated
|
|---|
| 13 |
|
|---|
| 14 | #Changelog: Tue Apr 17 12:35:20 CEST 2012
|
|---|
| 15 | # Submit requests also to wcps (both xml or abstract syntax)
|
|---|
| 16 | #############################################
|
|---|
| 17 |
|
|---|
| 18 | import httplib
|
|---|
| 19 | import urllib
|
|---|
| 20 | import sys
|
|---|
| 21 | import threading
|
|---|
| 22 | import time
|
|---|
| 23 | import commands
|
|---|
| 24 | import traceback
|
|---|
| 25 |
|
|---|
| 26 | ####################################################
|
|---|
| 27 | ######## global variables definition ###############
|
|---|
| 28 | ####################################################
|
|---|
| 29 |
|
|---|
| 30 | WCS_host = "rasdaman"
|
|---|
| 31 | WCS_port = 8080
|
|---|
| 32 | WCS_url = "/petascope/wcs2?"
|
|---|
| 33 | WCS_service = "WCS"
|
|---|
| 34 | WCS_version = "2.0.0"
|
|---|
| 35 |
|
|---|
| 36 | WCPS_url = "/petascope/wcps"
|
|---|
| 37 |
|
|---|
| 38 | CoverageId = "NIR"
|
|---|
| 39 |
|
|---|
| 40 | Output_File_Path = "/home/damiano"
|
|---|
| 41 |
|
|---|
| 42 | #################################
|
|---|
| 43 | #global variables for subset/trim
|
|---|
| 44 | _min_x = 0
|
|---|
| 45 | _max_x = 200
|
|---|
| 46 | _min_y = 0
|
|---|
| 47 | _max_y = 200
|
|---|
| 48 | ###############################
|
|---|
| 49 | ######################################################
|
|---|
| 50 |
|
|---|
| 51 | class GetCoverage():
|
|---|
| 52 | #########################################
|
|---|
| 53 | ############## INIT CLASS ###############
|
|---|
| 54 | #########################################
|
|---|
| 55 | def __init__(self, **kw):
|
|---|
| 56 | #initlog
|
|---|
| 57 | self.Request = "GetCoverage"
|
|---|
| 58 | self.service = WCS_service
|
|---|
| 59 | self.url = WCS_url
|
|---|
| 60 | self.subset = ""
|
|---|
| 61 |
|
|---|
| 62 | #set coverage id
|
|---|
| 63 | self.CoverageId = kw['CoverageId']
|
|---|
| 64 |
|
|---|
| 65 | #set wcs version
|
|---|
| 66 | self.version = WCS_version
|
|---|
| 67 |
|
|---|
| 68 | #########################################
|
|---|
| 69 | ############# SET SUBSET METHOD #########
|
|---|
| 70 | #########################################
|
|---|
| 71 | def setSubset(self, axis, minVal, maxVal):
|
|---|
| 72 | #if the axis is different than x or y log a warning message becuse the axis was not tested
|
|---|
| 73 | if axis.lower()!="x" and axis.lower()!="y" and axis.lower()!="t":
|
|---|
| 74 | self.LOG.warning("%s the axis %s is different than x-y-t and was not tested" %(self.Request, axis))
|
|---|
| 75 | #append new subset to self.subset string
|
|---|
| 76 | if axis.lower()=="t":
|
|---|
| 77 | self.subset += "subset%s=%s(%s,%s)&" %(axis.upper(), axis.lower(), minVal, maxVal)
|
|---|
| 78 | else:#axis.lower()=="x" OR axis.lower()=="y"
|
|---|
| 79 | self.subset += "subset%s=%s(%d,%d)&" %(axis.upper(), axis.lower(), minVal, maxVal)
|
|---|
| 80 |
|
|---|
| 81 | return self.subset
|
|---|
| 82 |
|
|---|
| 83 | #########################################
|
|---|
| 84 | ############ Get data METHOD ############
|
|---|
| 85 | #########################################
|
|---|
| 86 |
|
|---|
| 87 | def getData(self, **kw):
|
|---|
| 88 | #get host coverage id
|
|---|
| 89 | host = kw['host']
|
|---|
| 90 | port = kw['port']
|
|---|
| 91 |
|
|---|
| 92 | #create string
|
|---|
| 93 | request_string = WCS_url + "service=WCS" + "&Request=" + self.Request + "&version=" + self.version + "&CoverageId=" + self.CoverageId + "&" + self.subset
|
|---|
| 94 |
|
|---|
| 95 | conn = WCSRequestGet(host, port)
|
|---|
| 96 | xml = conn.execute(request_string)
|
|---|
| 97 |
|
|---|
| 98 | return xml
|
|---|
| 99 |
|
|---|
| 100 | class WCPSClient():
|
|---|
| 101 | #########################################
|
|---|
| 102 | ############## INIT CLASS ###############
|
|---|
| 103 | #########################################
|
|---|
| 104 | def __init__(self, **kw):
|
|---|
| 105 |
|
|---|
| 106 | #init subset
|
|---|
| 107 | try:
|
|---|
| 108 | self.subset = kw['subset']
|
|---|
| 109 | except:
|
|---|
| 110 | self.subset = {'x' : None,
|
|---|
| 111 | 'y' : None,
|
|---|
| 112 | 't' : None}
|
|---|
| 113 |
|
|---|
| 114 | self.output = "csv"
|
|---|
| 115 | self.CoverageId = kw['CoverageId']
|
|---|
| 116 |
|
|---|
| 117 | #############################################
|
|---|
| 118 | ############# WCPS QUERY GENERATION #########
|
|---|
| 119 | #############################################
|
|---|
| 120 | def queryGen(self):
|
|---|
| 121 | self.query = ""
|
|---|
| 122 | try:
|
|---|
| 123 | #set coverage id
|
|---|
| 124 | self.query += 'for data in (%s)' %(self.CoverageId)
|
|---|
| 125 |
|
|---|
| 126 | #set encode
|
|---|
| 127 | self.query += ' return encode( data'
|
|---|
| 128 |
|
|---|
| 129 | ######################
|
|---|
| 130 | # TRIM #
|
|---|
| 131 | ######################
|
|---|
| 132 | subset_query = ""
|
|---|
| 133 | try:
|
|---|
| 134 | if self.subset['x']:
|
|---|
| 135 | subset_query += ' x:"CRS:1"(%s:%s),' %(str(self.subset['x']['min']), str(self.subset['x']['max']))
|
|---|
| 136 | except:
|
|---|
| 137 | None
|
|---|
| 138 | try:
|
|---|
| 139 | if self.subset['y']:
|
|---|
| 140 | subset_query += ' y:"CRS:1"(%s:%s)' %(str(self.subset['y']['min']), str(self.subset['y']['max']))
|
|---|
| 141 | except:
|
|---|
| 142 | None
|
|---|
| 143 |
|
|---|
| 144 | #if subset is define add the "trim" operator to the WCPS query
|
|---|
| 145 | if subset_query:
|
|---|
| 146 | self.query = self.query.replace("encode(", "encode( trim(")
|
|---|
| 147 | subset_query = ', { ' + subset_query + ' })'
|
|---|
| 148 |
|
|---|
| 149 | self.query += subset_query
|
|---|
| 150 |
|
|---|
| 151 | #set output format
|
|---|
| 152 | self.query += ', "%s")' %(self.output)
|
|---|
| 153 |
|
|---|
| 154 | except:
|
|---|
| 155 | None
|
|---|
| 156 |
|
|---|
| 157 | return self.query
|
|---|
| 158 |
|
|---|
| 159 | #############################################
|
|---|
| 160 | ############# WCPS QUERY GENERATION #########
|
|---|
| 161 | #############################################
|
|---|
| 162 | def xmlGen(self):
|
|---|
| 163 | #XML is not currently implemented
|
|---|
| 164 | return ""
|
|---|
| 165 |
|
|---|
| 166 | ##################################################
|
|---|
| 167 | ############ Post request data METHOD ############
|
|---|
| 168 | ##################################################
|
|---|
| 169 |
|
|---|
| 170 | def postRequest(self, **kw):
|
|---|
| 171 | #get host coverage id
|
|---|
| 172 | host = kw['host']
|
|---|
| 173 | port = kw['port']
|
|---|
| 174 | url = WCPS_url
|
|---|
| 175 | WCPS_type = kw['type']
|
|---|
| 176 |
|
|---|
| 177 | #get ProcessCoverage request
|
|---|
| 178 | if WCPS_type == "query":
|
|---|
| 179 | WCPS_request = self.queryGen()
|
|---|
| 180 |
|
|---|
| 181 | elif WCPS_type == "xml":
|
|---|
| 182 | WCPS_request = self.xmlGen()
|
|---|
| 183 |
|
|---|
| 184 | #submit request
|
|---|
| 185 | try:
|
|---|
| 186 | conn = WCPSRequestPost(host, port, url)
|
|---|
| 187 | ProcessedCoverage = conn.execute(WCPS_type, WCPS_request)
|
|---|
| 188 | except:
|
|---|
| 189 | print ("Some errors occur retrivin ProcessedCoverage data.\n%s" %(traceback.format_exc()))
|
|---|
| 190 |
|
|---|
| 191 | return ProcessedCoverage
|
|---|
| 192 |
|
|---|
| 193 | #implement http reguest to WCS server
|
|---|
| 194 | class WCSRequestGet():
|
|---|
| 195 | def __init__(self, host, port = WCS_port):
|
|---|
| 196 | self.host = host
|
|---|
| 197 | self.port = port
|
|---|
| 198 |
|
|---|
| 199 | def execute(self, wcs_request_string):
|
|---|
| 200 | #init connection
|
|---|
| 201 | conn = httplib.HTTPConnection(self.host+":"+str(self.port))
|
|---|
| 202 | conn.request("GET", wcs_request_string)
|
|---|
| 203 | r = conn.getresponse()
|
|---|
| 204 |
|
|---|
| 205 | #check if response status is 200
|
|---|
| 206 | r_status = r.status
|
|---|
| 207 | if 200 <= r_status <= 299:
|
|---|
| 208 | xml = r.read()
|
|---|
| 209 | else:
|
|---|
| 210 | xml = "ERROR %d" %(r_status)
|
|---|
| 211 | print ("Unable to get data from WCS server. Bad response status N %d returned by WCS server" %(r_status))
|
|---|
| 212 |
|
|---|
| 213 | conn.close()
|
|---|
| 214 | return xml
|
|---|
| 215 |
|
|---|
| 216 | #implement http reguest POST to WCPS server
|
|---|
| 217 | class WCPSRequestPost():
|
|---|
| 218 | def __init__(self, host, port, url=WCPS_url):
|
|---|
| 219 | self.host = host
|
|---|
| 220 | self.port = port
|
|---|
| 221 | self.url = url
|
|---|
| 222 |
|
|---|
| 223 | def execute(self, parameter, request_string):
|
|---|
| 224 |
|
|---|
| 225 | #TAR.REP Request
|
|---|
| 226 | headers = {"Content-type": "application/x-www-form-urlencoded",
|
|---|
| 227 | "Accept": "text/plain"}
|
|---|
| 228 |
|
|---|
| 229 | params = urllib.urlencode({parameter : request_string})
|
|---|
| 230 | #established connection
|
|---|
| 231 | conn = httplib.HTTPConnection(self.host, self.port)
|
|---|
| 232 | conn.request("POST", self.url, params, headers)
|
|---|
| 233 |
|
|---|
| 234 | #get data
|
|---|
| 235 | response = conn.getresponse()
|
|---|
| 236 | processedCoverage = response.read()
|
|---|
| 237 | #close connection
|
|---|
| 238 | conn.close()
|
|---|
| 239 |
|
|---|
| 240 | return processedCoverage
|
|---|
| 241 |
|
|---|
| 242 | class requestThread(threading.Thread):
|
|---|
| 243 | def __init__(self,application, i):
|
|---|
| 244 | threading.Thread.__init__(self)
|
|---|
| 245 | self.application = application
|
|---|
| 246 | self.th_number = i
|
|---|
| 247 |
|
|---|
| 248 | #run method
|
|---|
| 249 | def run(self):
|
|---|
| 250 | start = time.time()
|
|---|
| 251 | print "Thread %d STARTED" %(self.th_number)
|
|---|
| 252 | if self.application == "ras":
|
|---|
| 253 | #request to rasdaman
|
|---|
| 254 | rasdaman_request = 'rasql -q "select csv(n[%d:%d,%d:%d]) from NIR as n" --out string' %(_min_x, _max_x, _min_y, _max_y)
|
|---|
| 255 | data_Out = str(commands.getstatusoutput(rasdaman_request))
|
|---|
| 256 |
|
|---|
| 257 | filename = "%s/%s_output_%d.xml" %(Output_File_Path, self.application, self.th_number)
|
|---|
| 258 |
|
|---|
| 259 | elif self.application == "wcs":
|
|---|
| 260 | #request to petascope
|
|---|
| 261 | obj = GetCoverage(CoverageId=CoverageId)
|
|---|
| 262 | #set subset
|
|---|
| 263 | obj.setSubset("x", _min_x, _max_x)
|
|---|
| 264 | obj.setSubset("y", _min_y, _max_y)
|
|---|
| 265 |
|
|---|
| 266 | #get xml response
|
|---|
| 267 | data_Out = obj.getData(host=WCS_host, port=WCS_port)
|
|---|
| 268 |
|
|---|
| 269 | filename = "%s/%s_output_%d.xml" %(Output_File_Path, self.application, self.th_number)
|
|---|
| 270 |
|
|---|
| 271 | elif self.application == "wcps":
|
|---|
| 272 | #request to petascope
|
|---|
| 273 | obj = WCPSClient(CoverageId=CoverageId)
|
|---|
| 274 |
|
|---|
| 275 | #set subset
|
|---|
| 276 | obj.subset['x'] = {'min':_min_x, 'max':_max_x, 'EPSG':1}
|
|---|
| 277 | obj.subset['y'] = {'min':_min_y, 'max':_max_y, 'EPSG':1}
|
|---|
| 278 |
|
|---|
| 279 | #get xml response
|
|---|
| 280 | data_Out = obj.postRequest(host=WCS_host, port=WCS_port, type="query")
|
|---|
| 281 |
|
|---|
| 282 | filename = "%s/%s_output_%d.txt" %(Output_File_Path, self.application, self.th_number)
|
|---|
| 283 |
|
|---|
| 284 | #write output to file
|
|---|
| 285 | out_file = open(filename,"w")
|
|---|
| 286 | out_file.write(data_Out)
|
|---|
| 287 | out_file.close()
|
|---|
| 288 |
|
|---|
| 289 | print "Thread %d ENDED in %f seconds" %(self.th_number,time.time()-start)
|
|---|
| 290 | sys.exit(0)
|
|---|
| 291 |
|
|---|
| 292 | if __name__ == '__main__':
|
|---|
| 293 | try:
|
|---|
| 294 | application = sys.argv[1]
|
|---|
| 295 | req_number = int(sys.argv[2],10)
|
|---|
| 296 | if (application != "ras" or application != "wcs" or application != "wcps") and req_number < 1:
|
|---|
| 297 | raise
|
|---|
| 298 | except:
|
|---|
| 299 | print "ERROR bad arguments. USAGE:\n\tpython %s APP N" %(sys.argv[0])
|
|---|
| 300 | print '\t\tAPP can be:'
|
|---|
| 301 | print '\t\t\t\t"ras" for RASDAMAN,'
|
|---|
| 302 | print '\t\t\t\t"wcs" for PETASCOPE wcs2.0,'
|
|---|
| 303 | print '\t\t\t\t"wcps" for PETASCOPE wcps abstract syntax\n'
|
|---|
| 304 | print '\t\tN is the number of requests generated'
|
|---|
| 305 | sys.exit(65)
|
|---|
| 306 |
|
|---|
| 307 | #thread generation
|
|---|
| 308 | thread_list = []
|
|---|
| 309 | for i in range(0, req_number):
|
|---|
| 310 | thread_list.append(requestThread(application, i))
|
|---|
| 311 | #thread execution
|
|---|
| 312 | thread_list[i].start()
|
|---|
| 313 |
|
|---|
| 314 | #Wait for the thread_list tasks to finish
|
|---|
| 315 | for i in range(0,len(thread_list)):
|
|---|
| 316 | thread_list[i].join()
|
|---|
| 317 |
|
|---|
| 318 | print "COMPLETE!"
|
|---|
| 319 |
|
|---|
| 320 | sys.exit(0)
|
|---|
| 321 |
|
|---|