Package boto :: Package services :: Module service
[hide private]
[frames] | no frames]

Source Code for Module boto.services.service

  1  # Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ 
  2  # 
  3  # Permission is hereby granted, free of charge, to any person obtaining a 
  4  # copy of this software and associated documentation files (the 
  5  # "Software"), to deal in the Software without restriction, including 
  6  # without limitation the rights to use, copy, modify, merge, publish, dis- 
  7  # tribute, sublicense, and/or sell copies of the Software, and to permit 
  8  # persons to whom the Software is furnished to do so, subject to the fol- 
  9  # lowing conditions: 
 10  # 
 11  # The above copyright notice and this permission notice shall be included 
 12  # in all copies or substantial portions of the Software. 
 13  # 
 14  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 
 15  # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 
 16  # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 
 17  # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,  
 18  # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 
 19  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 
 20  # IN THE SOFTWARE. 
 21   
 22  import boto 
 23  from boto.services.message import ServiceMessage 
 24  from boto.services.servicedef import ServiceDef 
 25  from boto.pyami.scriptbase import ScriptBase 
 26  from boto.exception import S3ResponseError 
 27  from boto.utils import get_ts 
 28  import StringIO 
 29  import time 
 30  import os 
 31  import sys, traceback 
 32  import mimetypes 
 33   
34 -class Service(ScriptBase):
35 36 # Time required to process a transaction 37 ProcessingTime = 60 38
39 - def __init__(self, config_file=None, mimetype_files=None):
40 ScriptBase.__init__(self, config_file) 41 self.name = self.__class__.__name__ 42 self.working_dir = boto.config.get('Pyami', 'working_dir') 43 self.sd = ServiceDef(config_file) 44 self.retry_count = self.sd.getint('retry_count', 5) 45 self.loop_delay = self.sd.getint('loop_delay', 30) 46 self.processing_time = self.sd.getint('processing_time', 60) 47 self.input_queue = self.sd.get_obj('input_queue') 48 self.output_queue = self.sd.get_obj('output_queue') 49 self.output_domain = self.sd.get_obj('output_domain') 50 if mimetype_files: 51 mimetypes.init(mimetype_files)
52
53 - def split_key(key):
54 if key.find(';') < 0: 55 t = (key, '') 56 else: 57 key, type = key.split(';') 58 label, mtype = type.split('=') 59 t = (key, mtype) 60 return t
61
62 - def read_message(self):
63 boto.log.info('read_message') 64 message = self.input_queue.read(self.processing_time) 65 if message: 66 boto.log.info(message.get_body()) 67 key = 'Service-Read' 68 message[key] = get_ts() 69 return message
70 71 # retrieve the source file from S3
72 - def get_file(self, message):
73 bucket_name = message['Bucket'] 74 key_name = message['InputKey'] 75 file_name = os.path.join(self.working_dir, message.get('OriginalFileName', 'in_file')) 76 boto.log.info('get_file: %s/%s to %s' % (bucket_name, key_name, file_name)) 77 bucket = boto.lookup('s3', bucket_name) 78 key = bucket.new_key(key_name) 79 key.get_contents_to_filename(os.path.join(self.working_dir, file_name)) 80 return file_name
81 82 # process source file, return list of output files
83 - def process_file(self, in_file_name, msg):
84 return []
85 86 # store result file in S3
87 - def put_file(self, bucket_name, file_path, key_name=None):
88 boto.log.info('putting file %s as %s.%s' % (file_path, bucket_name, key_name)) 89 bucket = boto.lookup('s3', bucket_name) 90 key = bucket.new_key(key_name) 91 key.set_contents_from_filename(file_path) 92 return key
93
94 - def save_results(self, results, input_message, output_message):
95 output_keys = [] 96 for file, type in results: 97 if input_message.has_key('OutputBucket'): 98 output_bucket = input_message['OutputBucket'] 99 else: 100 output_bucket = input_message['Bucket'] 101 key_name = os.path.split(file)[1] 102 key = self.put_file(output_bucket, file, key_name) 103 output_keys.append('%s;type=%s' % (key.name, type)) 104 output_message['OutputKey'] = ','.join(output_keys)
105 106 # write message to each output queue
107 - def write_message(self, message):
108 message['Service-Write'] = get_ts() 109 message['Server'] = self.name 110 if os.environ.has_key('HOSTNAME'): 111 message['Host'] = os.environ['HOSTNAME'] 112 else: 113 message['Host'] = 'unknown' 114 message['Instance-ID'] = self.instance_id 115 if self.output_queue: 116 boto.log.info('Writing message to SQS queue: %s' % self.output_queue.id) 117 self.output_queue.write(message) 118 if self.output_domain: 119 boto.log.info('Writing message to SDB domain: %s' % self.output_domain.name) 120 item_name = '/'.join([message['Service-Write'], message['Bucket'], message['InputKey']]) 121 self.output_domain.put_attributes(item_name, message)
122 123 # delete message from input queue
124 - def delete_message(self, message):
125 boto.log.info('deleting message from %s' % self.input_queue.id) 126 self.input_queue.delete_message(message)
127 128 # to clean up any files, etc. after each iteration
129 - def cleanup(self):
130 pass
131
132 - def shutdown(self):
133 on_completion = self.sd.get('on_completion', 'shutdown') 134 if on_completion == 'shutdown': 135 if self.instance_id: 136 time.sleep(60) 137 c = boto.connect_ec2() 138 c.terminate_instances([self.instance_id])
139
140 - def main(self, notify=False):
141 self.notify('Service: %s Starting' % self.name) 142 empty_reads = 0 143 while self.retry_count < 0 or empty_reads < self.retry_count: 144 try: 145 input_message = self.read_message() 146 if input_message: 147 empty_reads = 0 148 output_message = ServiceMessage(None, input_message.get_body()) 149 input_file = self.get_file(input_message) 150 results = self.process_file(input_file, output_message) 151 self.save_results(results, input_message, output_message) 152 self.write_message(output_message) 153 self.delete_message(input_message) 154 self.cleanup() 155 else: 156 empty_reads += 1 157 time.sleep(self.loop_delay) 158 except Exception, e: 159 boto.log.exception('Service Failed') 160 empty_reads += 1 161 self.create_connections() 162 self.notify('Service: %s Shutting Down' % self.name) 163 self.shutdown()
164