1 # vi: encoding=utf-8 ts=8 sts=4 sw=4 et
21 from bqueue import B_Queue
22 from config import config, init_conf
26 def alarmalarm(signum, frame):
27 raise IOError, 'TCP connection hung'
29 def has_new(control_url):
31 cnt_f = open(path.last_req_no_file)
33 last_count = int(string.strip(cnt_f.readline()))
39 socket.setdefaulttimeout(240)
40 signal.signal(signal.SIGALRM, alarmalarm)
43 f = urllib2.urlopen(control_url + "/max_req_no")
44 count = int(string.strip(f.readline()))
48 log.error("can't fetch %s: %s" % (control_url + "/max_req_no", e))
51 if count != last_count:
56 def fetch_queue(control_url):
57 signal.signal(signal.SIGALRM, alarmalarm)
58 socket.setdefaulttimeout(240)
61 f = urllib2.urlopen(control_url + "/queue.gz")
65 log.error("can't fetch %s: %s" % (control_url + "/queue.gz", e))
67 sio = StringIO.StringIO()
71 f = gzip.GzipFile(fileobj = sio)
72 (signers, body) = gpg.verify_sig(f.read())
73 u = acl.user_by_email(signers)
75 log.alert("queue.gz not signed with signature of valid user: %s" % signers)
77 if not u.can_do("sign_queue", "all"):
78 log.alert("user %s is not allowed to sign my queue" % u.login)
80 return request.parse_requests(body)
82 def handle_reqs(builder, reqs):
83 qpath = path.queue_file + "-" + builder
84 if not os.access(qpath, os.F_OK):
85 util.append_to(qpath, "<queue/>\n")
91 raise Exception, 'handle_reqs: fatal: huh? %s' % r.kind
94 if builder in b.builders:
97 log.notice("queued %s (%d) for %s" % (r.id, r.no, builder))
103 lck = lock.lock("request_fetcher", non_block = True)
108 status.push("fetching requests")
109 if has_new(config.control_url):
110 q = fetch_queue(config.control_url)
116 if r.no > last_count:
118 for b in config.binary_builders:
119 handle_reqs(b, q_new)
120 f = open(path.last_req_no_file, "w")
121 f.write("%d\n" % max_no)
126 if __name__ == '__main__':
127 # http connection is established (and few bytes transferred through it)
128 # each $secs seconds.
129 loop.run_loop(main, secs = 10)