| Home | Trees | Indices | Help |
|
|---|
|
|
1 # Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ 2 # 3 # Permission is hereby granted, free of charge, to any person obtaining a 4 # copy of this software and associated documentation files (the 5 # "Software"), to deal in the Software without restriction, including 6 # without limitation the rights to use, copy, modify, merge, publish, dis- 7 # tribute, sublicense, and/or sell copies of the Software, and to permit 8 # persons to whom the Software is furnished to do so, subject to the fol- 9 # lowing conditions: 10 # 11 # The above copyright notice and this permission notice shall be included 12 # in all copies or substantial portions of the Software. 13 # 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 20 # IN THE SOFTWARE. 21 22 import boto 23 from boto.services.message import ServiceMessage 24 from boto.services.servicedef import ServiceDef 25 from boto.pyami.scriptbase import ScriptBase 26 from boto.exception import S3ResponseError 27 from boto.utils import get_ts 28 import StringIO 29 import time 30 import os 31 import sys, traceback 32 import mimetypes 3335 36 # Time required to process a transaction 37 ProcessingTime = 60 3816440 ScriptBase.__init__(self, config_file) 41 self.name = self.__class__.__name__ 42 self.working_dir = boto.config.get('Pyami', 'working_dir') 43 self.sd = ServiceDef(config_file) 44 self.retry_count = self.sd.getint('retry_count', 5) 45 self.loop_delay = self.sd.getint('loop_delay', 30) 46 self.processing_time = self.sd.getint('processing_time', 60) 47 self.input_queue = self.sd.get_obj('input_queue') 48 self.output_queue = self.sd.get_obj('output_queue') 49 self.output_domain = self.sd.get_obj('output_domain') 50 if mimetype_files: 51 mimetypes.init(mimetype_files)5254 if key.find(';') < 0: 55 t = (key, '') 56 else: 57 key, type = key.split(';') 58 label, mtype = type.split('=') 59 t = (key, mtype) 60 return t6163 boto.log.info('read_message') 64 message = self.input_queue.read(self.processing_time) 65 if message: 66 boto.log.info(message.get_body()) 67 key = 'Service-Read' 68 message[key] = get_ts() 69 return message70 71 # retrieve the source file from S373 bucket_name = message['Bucket'] 74 key_name = message['InputKey'] 75 file_name = os.path.join(self.working_dir, message.get('OriginalFileName', 'in_file')) 76 boto.log.info('get_file: %s/%s to %s' % (bucket_name, key_name, file_name)) 77 bucket = boto.lookup('s3', bucket_name) 78 key = bucket.new_key(key_name) 79 key.get_contents_to_filename(os.path.join(self.working_dir, file_name)) 80 return file_name81 82 # process source file, return list of output files 85 86 # store result file in S388 boto.log.info('putting file %s as %s.%s' % (file_path, bucket_name, key_name)) 89 bucket = boto.lookup('s3', bucket_name) 90 key = bucket.new_key(key_name) 91 key.set_contents_from_filename(file_path) 92 return key9395 output_keys = [] 96 for file, type in results: 97 if input_message.has_key('OutputBucket'): 98 output_bucket = input_message['OutputBucket'] 99 else: 100 output_bucket = input_message['Bucket'] 101 key_name = os.path.split(file)[1] 102 key = self.put_file(output_bucket, file, key_name) 103 output_keys.append('%s;type=%s' % (key.name, type)) 104 output_message['OutputKey'] = ','.join(output_keys)105 106 # write message to each output queue108 message['Service-Write'] = get_ts() 109 message['Server'] = self.name 110 if os.environ.has_key('HOSTNAME'): 111 message['Host'] = os.environ['HOSTNAME'] 112 else: 113 message['Host'] = 'unknown' 114 message['Instance-ID'] = self.instance_id 115 if self.output_queue: 116 boto.log.info('Writing message to SQS queue: %s' % self.output_queue.id) 117 self.output_queue.write(message) 118 if self.output_domain: 119 boto.log.info('Writing message to SDB domain: %s' % self.output_domain.name) 120 item_name = '/'.join([message['Service-Write'], message['Bucket'], message['InputKey']]) 121 self.output_domain.put_attributes(item_name, message)122 123 # delete message from input queue125 boto.log.info('deleting message from %s' % self.input_queue.id) 126 self.input_queue.delete_message(message)127 128 # to clean up any files, etc. after each iteration 131133 on_completion = self.sd.get('on_completion', 'shutdown') 134 if on_completion == 'shutdown': 135 if self.instance_id: 136 time.sleep(60) 137 c = boto.connect_ec2() 138 c.terminate_instances([self.instance_id])139141 self.notify('Service: %s Starting' % self.name) 142 empty_reads = 0 143 while self.retry_count < 0 or empty_reads < self.retry_count: 144 try: 145 input_message = self.read_message() 146 if input_message: 147 empty_reads = 0 148 output_message = ServiceMessage(None, input_message.get_body()) 149 input_file = self.get_file(input_message) 150 results = self.process_file(input_file, output_message) 151 self.save_results(results, input_message, output_message) 152 self.write_message(output_message) 153 self.delete_message(input_message) 154 self.cleanup() 155 else: 156 empty_reads += 1 157 time.sleep(self.loop_delay) 158 except Exception, e: 159 boto.log.exception('Service Failed') 160 empty_reads += 1 161 self.create_connections() 162 self.notify('Service: %s Shutting Down' % self.name) 163 self.shutdown()
| Home | Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1 on Wed Dec 10 07:39:49 2008 | http://epydoc.sourceforge.net |