+--- a/slug.py 2014-10-19 18:07:38.000000000 +0200
++++ b/slug.py 2014-11-20 22:14:19.182018701 +0100
+@@ -7,26 +7,18 @@
+ import shutil
+ import subprocess
+ import queue
+-import threading
+-
++import multiprocessing
+ import argparse
+
+ import signal
+ import configparser
+
++from multiprocessing import Pool as WorkerPool
++
+ from git_slug.gitconst import GITLOGIN, GITSERVER, GIT_REPO, GIT_REPO_PUSH, REMOTE_NAME, REMOTEREFS
+ from git_slug.gitrepo import GitRepo, GitRepoError
+ from git_slug.refsdata import GitArchiveRefsData, NoMatchedRepos, RemoteRefsError
+
+-class Store():
+- def __init__(self):
+- self.lock = threading.Lock()
+- self.items = []
+-
+- def put(self, item):
+- with self.lock:
+- self.items.append(item)
+-
+ class UnquoteConfig(configparser.ConfigParser):
+ def get(self, section, option, **kwargs):
+ value = super().get(section, option, **kwargs)
+@@ -43,25 +35,15 @@
+ item.append(values)
+ setattr(namespace, self.dest, item)
+
+-class ThreadFetch(threading.Thread):
+- def __init__(self, queue, output, pkgdir, depth=0):
+- threading.Thread.__init__(self)
+- self.queue = queue
+- self.packagesdir = pkgdir
+- self.depth = depth
+- self.output = output
+-
+- def run(self):
+- while True:
+- (gitrepo, ref2fetch) = self.queue.get()
+- try:
+- (stdout, stderr) = gitrepo.fetch(ref2fetch, self.depth)
+- if stderr != b'':
+- print('------', gitrepo.gdir[:-len('.git')], '------\n' + stderr.decode('utf-8'))
+- self.output.put(gitrepo)
+- except GitRepoError as e:
+- print('------', gitrepo.gdir[:-len('.git')], '------\n', e)
+- self.queue.task_done()
++def cpu_count():
++ try:
++ return multiprocessing.cpu_count()
++ except NotImplementedError:
++ pass
++ return 4
++
++def pool_worker_init():
++ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ def readconfig(path):
+ config = UnquoteConfig(delimiters='=', interpolation=None, strict=False)
+@@ -114,18 +96,19 @@
+ sys.exit(2)
+ return refs
+
++def fetch_package(gitrepo, ref2fetch, options):
++ try:
++ (stdout, stderr) = gitrepo.fetch(ref2fetch, options.depth)
++ if stderr != b'':
++ print('------', gitrepo.gdir[:-len('.git')], '------\n' + stderr.decode('utf-8'))
++ return gitrepo
++ except GitRepoError as e:
++ print('------', gitrepo.gdir[:-len('.git')], '------\n', e)
++
+ def fetch_packages(options, return_all=False):
+- fetch_queue = queue.Queue()
+- updated_repos = Store()
+- for i in range(options.jobs):
+- t = ThreadFetch(fetch_queue, updated_repos, options.packagesdir, options.depth)
+- t.setDaemon(True)
+- t.start()
+-
+- signal.signal(signal.SIGINT, signal.SIG_DFL)
+-
+ refs = getrefs(options.branch, options.repopattern)
+ print('Read remotes data')
++ args = []
+ for pkgdir in sorted(refs.heads):
+ gitdir = os.path.join(options.packagesdir, pkgdir, '.git')
+ if not os.path.isdir(gitdir):
+@@ -143,9 +126,18 @@
+ ref2fetch.append('+{}:{}/{}'.format(ref, REMOTEREFS, ref[len('refs/heads/'):]))
+ if ref2fetch:
+ ref2fetch.append('refs/notes/*:refs/notes/*')
+- fetch_queue.put((gitrepo, ref2fetch))
++ args.append((gitrepo, ref2fetch, options))
++
++ pool = WorkerPool(options.jobs, pool_worker_init)
++ try:
++ updated_repos = pool.starmap(fetch_package, args)
++ except KeyboardInterrupt:
++ pool.terminate()
++ else:
++ pool.close()
++ pool.join()
+
+- fetch_queue.join()
++ updated_repos = list(filter(None, updated_repos))
+
+ if options.prune:
+ refs = getrefs('*')
+@@ -158,26 +150,60 @@
+ if return_all:
+ return refs.heads
+ else:
+- return updated_repos.items
++ return updated_repos
++
++def checkout_package(repo, options):
++ try:
++ repo.checkout(options.checkout)
++ except GitRepoError as e:
++ print('Problem with checking branch {} in repo {}: {}'.format(options.checkout, repo.gdir, e), file=sys.stderr)
+
+ def checkout_packages(options):
+ if options.checkout is None:
+ options.checkout = "/".join([REMOTE_NAME, options.branch[0]])
+ fetch_packages(options)
+ refs = getrefs(options.branch, options.repopattern)
++ repos = []
+ for pkgdir in sorted(refs.heads):
+- repo = GitRepo(os.path.join(options.packagesdir, pkgdir))
+- try:
+- repo.checkout(options.checkout)
+- except GitRepoError as e:
+- print('Problem with checking branch {} in repo {}: {}'.format(options.checkout, repo.gdir, e), file=sys.stderr)
++ repos.append((GitRepo(os.path.join(options.packagesdir, pkgdir)), options.checkout))
++ pool = multiprocessing.Poll(options.jobs)
++ try:
++ pool.starmap(checkout_package, zip(repos, [options] * len(repos)))
++ except KeyboardInterrupt:
++ pool.terminate()
++ else:
++ pool.close()
++ pool.join()
++
++def clone_package(repo, options):
++ try:
++ repo.checkout('master')
++ except GitRepoError as e:
++ print('Problem with checking branch master in repo {}: {}'.format(repo.gdir, e), file=sys.stderr)
+
+ def clone_packages(options):
+- for repo in fetch_packages(options):
+- try:
+- repo.checkout('master')
+- except GitRepoError as e:
+- print('Problem with checking branch master in repo {}: {}'.format(repo.gdir, e), file=sys.stderr)
++ repos = fetch_packages(options)
++ pool = multiprocessing.Poll(options.jobs)
++ try:
++ pool.starmap(clone_package, zip(repos, [options] * len(repos)))
++ except KeyboardInterrupt:
++ pool.terminate()
++ else:
++ pool.close()
++ pool.join()
++
++def pull_package(gitrepo, options):
++ directory = os.path.basename(gitrepo.wtree)
++ try:
++ (out, err) = gitrepo.commandexc(['rev-parse', '-q', '--verify', '@{u}'])
++ sha1 = out.decode().strip()
++ (out, err) = gitrepo.commandexc(['rebase', sha1])
++ for line in out.decode().splitlines():
++ print(directory,":",line)
++ except GitRepoError as e:
++ for line in e.args[0].splitlines():
++ print("{}: {}".format(directory,line))
++ pass
+
+ def pull_packages(options):
+ repolist = []
+@@ -189,19 +215,14 @@
+ else:
+ repolist = fetch_packages(options, False)
+ print('--------Pulling------------')
+- for gitrepo in repolist:
+- directory = os.path.basename(gitrepo.wtree)
+- try:
+- (out, err) = gitrepo.commandexc(['rev-parse', '-q', '--verify', '@{u}'])
+- sha1 = out.decode().strip()
+- (out, err) = gitrepo.commandexc(['rebase', sha1])
+- for line in out.decode().splitlines():
+- print(directory,":",line)
+- except GitRepoError as e:
+- for line in e.args[0].splitlines():
+- print("{}: {}".format(directory,line))
+- pass
+-
++ pool = WorkerPool(options.jobs, pool_worker_init)
++ try:
++ pool.starmap(pull_package, zip(repolist, [options] * len(repolist)))
++ except KeyboardInterrupt:
++ pool.terminate()
++ else:
++ pool.close()
++ pool.join()
+
+ def list_packages(options):
+ refs = getrefs(options.branch, options.repopattern)
+@@ -213,7 +234,7 @@
+ default=os.path.expanduser('~/rpm/packages'))
+
+ common_fetchoptions = argparse.ArgumentParser(add_help=False, parents=[common_options])
+-common_fetchoptions.add_argument('-j', '--jobs', help='number of threads to use', default=4, type=int)
++common_fetchoptions.add_argument('-j', '--jobs', help='number of threads to use', default=cpu_count(), type=int)
+ common_fetchoptions.add_argument('repopattern', nargs='*', default = ['*'])
+ common_fetchoptions.add_argument('--depth', help='depth of fetch', default=0)
+