]> git.pld-linux.org Git - projects/pld-builder.new.git/blob - PLD_Builder/file_sender.py
- whitespace cleanup
[projects/pld-builder.new.git] / PLD_Builder / file_sender.py
1 # vi: encoding=utf-8 ts=8 sts=4 sw=4 et
2
3 import glob
4 import re
5 import string
6 import os
7 import time
8 import shutil
9 import sys
10 import traceback
11 import urllib2
12
13 from config import config, init_conf
14 import mailer
15 import path
16 import log
17 import loop
18 import status
19 import lock
20
21 retries_times = [5 * 60, 5 * 60, 10 * 60, 10 * 60, 30 * 60, 60 * 60]
22
23 def read_name_val(file):
24     f = open(file)
25     r = {'_file': file[:-5], '_desc': file}
26     rx = re.compile(r"^([^:]+)\s*:(.*)$")
27     for l in f.xreadlines():
28         if l == "END\n":
29             f.close()
30             return r
31         m = rx.search(l)
32         if m:
33             r[m.group(1)] = string.strip(m.group(2))
34         else:
35             break
36     f.close()
37     return None
38
39 def scp_file(src, target):
40     global problems
41     f = os.popen("scp -v -B %s %s 2>&1 < /dev/null" % (src, target))
42     p = f.read()
43     ret = f.close()
44     if ret:
45         problems[src] = p
46     return ret
47
48 def copy_file(src, target):
49     try:
50         shutil.copyfile(src, target)
51         return 0
52     except:
53         global problems
54         exctype, value = sys.exc_info()[:2]
55         problem[src] = "cannot copy file: %s" % traceback.format_exception_only(exctype, value)
56         return 1
57
58 def rsync_file(src, target, host):
59     global problems
60     p = open(path.rsync_password_file, "r")
61     password = None
62     for l in p.xreadlines():
63         l = string.split(l)
64         if len(l) >= 2 and l[0] == host:
65             password = l[1]
66     p.close()
67     rsync = "rsync --verbose --archive"
68     if password != None:
69         p = open(".rsync.pass", "w")
70         os.chmod(".rsync.pass", 0600)
71         p.write("%s\n" % password)
72         p.close()
73         rsync += " --password-file .rsync.pass"
74     f = os.popen("%s %s %s 2>&1 < /dev/null" % (rsync, src, target))
75     p = f.read()
76     if password != None:
77         os.unlink(".rsync.pass")
78     ret = f.close()
79     if ret:
80         problems[src] = p
81     return ret
82
83 def rsync_ssh_file(src, target):
84     global problems
85     rsync = "rsync --verbose --archive -e ssh"
86     f = os.popen("%s %s %s 2>&1 < /dev/null" % (rsync, src, target))
87     p = f.read()
88     ret = f.close()
89     if ret:
90         problems[src] = p
91     return ret
92
93 def post_file(src, url):
94     global problems
95     try:
96         f = open(src, 'r')
97         data = f.read()
98         f.close()
99         req = urllib2.Request(url, data)
100         req.add_header('X-Filename', os.path.basename(src))
101         f = urllib2.urlopen(req)
102         f.close()
103     except Exception, e:
104         problems[src] = e
105         return e
106     return 0
107
108 def send_file(src, target):
109     global problems
110     try:
111         log.notice("sending %s to %s (size %d bytes)" % (src, target, os.stat(src).st_size))
112         m = re.match('rsync://([^/]+)/.*', target)
113         if m:
114             return not rsync_file(src, target, host = m.group(1))
115         if target != "" and target[0] == '/':
116             return not copy_file(src, target)
117         m = re.match('scp://([^@:]+@[^/:]+)(:|)(.*)', target)
118         if m:
119             return not scp_file(src, m.group(1) + ":" + m.group(3))
120         m = re.match('ssh\+rsync://([^@:]+@[^/:]+)(:|)(.*)', target)
121         if m:
122             return not rsync_ssh_file(src, m.group(1) + ":" + m.group(3))
123         m = re.match('http://.*', target)
124         if m:
125             return not post_file(src, target)
126         log.alert("unsupported protocol: %s" % target)
127     except OSError, e:
128         problems[src] = e
129         log.error("send_file(%s, %s): %s" % (src, target, e))
130         return False
131     return True
132
133 def maybe_flush_queue(dir):
134     retry_delay = 0
135     try:
136         f = open(dir + "/retry-at")
137         last_retry = int(string.strip(f.readline()))
138         retry_delay = int(string.strip(f.readline()))
139         f.close()
140         if last_retry + retry_delay > time.time():
141             return
142         os.unlink(dir + "/retry-at")
143     except:
144         pass
145
146     status.push("flushing %s" % dir)
147
148     if flush_queue(dir):
149         f = open(dir + "/retry-at", "w")
150         if retry_delay in retries_times:
151             idx = retries_times.index(retry_delay)
152             if idx < len(retries_times) - 1: idx += 1
153         else:
154             idx = 0
155         f.write("%d\n%d\n" % (time.time(), retries_times[idx]))
156         f.close()
157
158     status.pop()
159
160 def flush_queue(dir):
161     q = []
162     os.chdir(dir)
163     for f in glob.glob(dir + "/*.desc"):
164         d = read_name_val(f)
165         if d != None: q.append(d)
166     def mycmp(x, y):
167         rc = cmp(x['Time'], y['Time'])
168         if rc == 0 and x.has_key('Type') and y.has_key('Type'):
169             return cmp(x['Type'], y['Type'])
170         else:
171             return rc
172     q.sort(mycmp)
173
174     error = None
175     # copy of q
176     remaining = q[:]
177     for d in q:
178         if not send_file(d['_file'], d['Target']):
179             error = d
180             continue
181         if os.access(d['_file'] + ".info", os.F_OK):
182             if not send_file(d['_file'] + ".info", d['Target'] + ".info"):
183                 error = d
184                 continue
185             os.unlink(d['_file'] + ".info")
186         os.unlink(d['_file'])
187         os.unlink(d['_desc'])
188         remaining.remove(d)
189
190     if error != None:
191         emails = {}
192         emails[config.admin_email] = 1
193         pr = ""
194         for src, msg in problems.iteritems():
195             pr = pr + "[src: %s]\n\n%s\n" % (src, msg)
196         for d in remaining:
197             if d.has_key('Requester'):
198                 emails[d['Requester']] = 1
199         e = emails.keys()
200         m = mailer.Message()
201         m.set_headers(to = string.join(e, ", "),
202                       subject = "[%s] builder queue problem" % config.builder)
203         m.write("there were problems sending files from queue %s:\n" % dir)
204         m.write("problems:\n")
205         m.write("%s\n" % pr)
206         m.send()
207         log.error("error sending files from %s:\n%s\n" % (dir, pr))
208         return 1
209
210     return 0
211
212 problems = {}
213
214 def main():
215     if lock.lock("sending-files", non_block = 1) == None:
216         return
217     init_conf()
218     maybe_flush_queue(path.notify_queue_dir)
219     maybe_flush_queue(path.buildlogs_queue_dir)
220     maybe_flush_queue(path.ftp_queue_dir)
221
222 if __name__ == '__main__':
223     loop.run_loop(main)
This page took 0.034522 seconds and 3 git commands to generate.