-import ConfigParser
+# vi: encoding=utf-8 ts=8 sts=4 sw=4 et
+
import string
+import signal
import os
import urllib
+import urllib2
import StringIO
import sys
-
import gzip
import path
import util
import gpg
import request
+import loop
+import socket
from acl import acl
from bqueue import B_Queue
+from config import config, init_conf
last_count = 0
+def alarmalarm(signum, frame):
+ raise IOError, 'TCP connection hung'
+
def has_new(control_url):
- global last_count
- cnt_f = open(path.last_req_no_file)
- last_count = int(string.strip(cnt_f.readline()))
- cnt_f.close()
- f = urllib.urlopen(control_url + "/max_req_no")
- res = 0
- if int(string.strip(f.readline())) != last_count:
- res = 1
- f.close()
- return res
+ global last_count
+ cnt_f = open(path.last_req_no_file)
+ try:
+ last_count = int(string.strip(cnt_f.readline()))
+ except ValueError, e:
+ last_count = 0
+
+ cnt_f.close()
+ f = None
+ socket.setdefaulttimeout(240)
+ signal.signal(signal.SIGALRM, alarmalarm)
+ signal.alarm(300)
+ try:
+ headers = { 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' }
+ req = urllib2.Request(url=control_url + "/max_req_no", headers=headers)
+ f = urllib2.urlopen(req)
+ count = int(string.strip(f.readline()))
+ signal.alarm(0)
+ except Exception, e:
+ signal.alarm(0)
+ log.error("can't fetch %s: %s" % (control_url + "/max_req_no", e))
+ sys.exit(1)
+ res = 0
+ if count != last_count:
+ res = 1
+ f.close()
+ return res
-def fetch_queue(control_url, queue_signed_by):
- f = urllib.urlopen(control_url + "/queue.gz")
- sio = StringIO.StringIO()
- util.sendfile(f, sio)
- f.close()
- sio.seek(0)
- f = gzip.GzipFile(fileobj = sio)
- (signers, body) = gpg.verify_sig(f)
- u = acl.user_by_email(signers)
- if u == None:
- log.alert("queue.gz not signed with signature of valid user: %s" % signers)
- sys.exit(1)
- if u.login != queue_signed_by:
- log.alert("queue.gz should be signed by %s not by %s" \
- % (queue_signed_by, u.login))
- sys.exit(1)
- body.seek(0)
- return request.parse_requests(body)
+def fetch_queue(control_url):
+ signal.signal(signal.SIGALRM, alarmalarm)
+ socket.setdefaulttimeout(240)
+ signal.alarm(300)
+ try:
+ headers = { 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' }
+ req = urllib2.Request(url=control_url + "/queue.gz", headers=headers)
+ f = urllib2.urlopen(req)
+ signal.alarm(0)
+ except Exception, e:
+ signal.alarm(0)
+ log.error("can't fetch %s: %s" % (control_url + "/queue.gz", e))
+ sys.exit(1)
+ sio = StringIO.StringIO()
+ util.sendfile(f, sio)
+ f.close()
+ sio.seek(0)
+ f = gzip.GzipFile(fileobj = sio)
+ (signers, body) = gpg.verify_sig(f.read())
+ u = acl.user_by_email(signers)
+ if u == None:
+ log.alert("queue.gz not signed with signature of valid user: %s" % signers)
+ sys.exit(1)
+ if not u.can_do("sign_queue", "all"):
+ log.alert("user %s is not allowed to sign my queue" % u.login)
+ sys.exit(1)
+ return request.parse_requests(body)
def handle_reqs(builder, reqs):
- qpath = path.queue_file + "-" + builder
- if not os.access(qpath, os.F_OK):
- util.append_to(qpath, "<queue/>\n")
- q = B_Queue(qpath)
- q.lock(0)
- q.read()
- for r in reqs:
- if r.kind != 'group':
- raise 'handle_reqs: fatal: huh? %s' % r.kind
- need_it = 0
- for b in r.batches:
- if builder in b.builders:
- need_it = 1
- if need_it:
- log.notice("queued %s (%d) for %s" % (r.id, r.no, builder))
- q.add(r)
- q.write()
- q.unlock()
+ qpath = path.queue_file + "-" + builder
+ if not os.access(qpath, os.F_OK):
+ util.append_to(qpath, "<queue/>\n")
+ q = B_Queue(qpath)
+ q.lock(0)
+ q.read()
+ for r in reqs:
+ if r.kind != 'group':
+ raise Exception, 'handle_reqs: fatal: huh? %s' % r.kind
+ need_it = 0
+ for b in r.batches:
+ if builder in b.builders:
+ need_it = 1
+ if need_it:
+ log.notice("queued %s (%d) for %s" % (r.id, r.no, builder))
+ q.add(r)
+ q.write()
+ q.unlock()
def main():
- os.environ['LC_ALL'] = "C"
- lock.lock("request_fetcher")
-
- status.push("reading builder config")
- p = ConfigParser.ConfigParser()
- p.readfp(open(path.builder_conf))
- builders = string.split(p.get("all", "builders"))
- control_url = p.get("all", "control_url")
- queue_signed_by = p.get("all", "queue_signed_by")
- status.pop()
-
- status.push("fetching requests")
- if has_new(control_url):
- q = fetch_queue(control_url, queue_signed_by)
- max_no = 0
- q_new = []
- for r in q:
- if r.no > max_no:
- max_no = r.no
- if r.no > last_count:
- q_new.append(r)
- for b in builders:
- handle_reqs(b, q_new)
- f = open(path.last_req_no_file, "w")
- f.write("%d\n" % max_no)
- f.close()
- status.pop()
+ lck = lock.lock("request_fetcher", non_block = True)
+ if lck == None:
+ sys.exit(1)
+ init_conf()
+ acl.try_reload()
+
+ status.push("fetching requests")
+ if has_new(config.control_url):
+ q = fetch_queue(config.control_url)
+ max_no = 0
+ q_new = []
+ for r in q:
+ if r.no > max_no:
+ max_no = r.no
+ if r.no > last_count:
+ q_new.append(r)
+ for b in config.binary_builders:
+ handle_reqs(b, q_new)
+ f = open(path.last_req_no_file, "w")
+ f.write("%d\n" % max_no)
+ f.close()
+ status.pop()
+ lck.close()
-util.wrap(main)
+if __name__ == '__main__':
+ # http connection is established (and few bytes transferred through it)
+ # each $secs seconds.
+ loop.run_loop(main, secs = 10)