1 --- a/slug.py 2014-10-19 18:07:38.000000000 +0200
2 +++ b/slug.py 2014-11-20 22:31:39.005919529 +0100
3 @@ -7,26 +7,18 @@ import os
9 +import multiprocessing
15 +from multiprocessing import Pool as WorkerPool
17 from git_slug.gitconst import GITLOGIN, GITSERVER, GIT_REPO, GIT_REPO_PUSH, REMOTE_NAME, REMOTEREFS
18 from git_slug.gitrepo import GitRepo, GitRepoError
19 from git_slug.refsdata import GitArchiveRefsData, NoMatchedRepos, RemoteRefsError
23 - self.lock = threading.Lock()
26 - def put(self, item):
28 - self.items.append(item)
30 class UnquoteConfig(configparser.ConfigParser):
31 def get(self, section, option, **kwargs):
32 value = super().get(section, option, **kwargs)
33 @@ -43,25 +35,15 @@ class DelAppend(argparse._AppendAction):
35 setattr(namespace, self.dest, item)
37 -class ThreadFetch(threading.Thread):
38 - def __init__(self, queue, output, pkgdir, depth=0):
39 - threading.Thread.__init__(self)
41 - self.packagesdir = pkgdir
43 - self.output = output
47 - (gitrepo, ref2fetch) = self.queue.get()
49 - (stdout, stderr) = gitrepo.fetch(ref2fetch, self.depth)
51 - print('------', gitrepo.gdir[:-len('.git')], '------\n' + stderr.decode('utf-8'))
52 - self.output.put(gitrepo)
53 - except GitRepoError as e:
54 - print('------', gitrepo.gdir[:-len('.git')], '------\n', e)
55 - self.queue.task_done()
58 + return multiprocessing.cpu_count()
59 + except NotImplementedError:
63 +def pool_worker_init():
64 + signal.signal(signal.SIGINT, signal.SIG_IGN)
67 config = UnquoteConfig(delimiters='=', interpolation=None, strict=False)
68 @@ -114,18 +96,19 @@ def getrefs(*args):
72 +def fetch_package(gitrepo, ref2fetch, options):
74 + (stdout, stderr) = gitrepo.fetch(ref2fetch, options.depth)
76 + print('------', gitrepo.gdir[:-len('.git')], '------\n' + stderr.decode('utf-8'))
78 + except GitRepoError as e:
79 + print('------', gitrepo.gdir[:-len('.git')], '------\n', e)
81 def fetch_packages(options, return_all=False):
82 - fetch_queue = queue.Queue()
83 - updated_repos = Store()
84 - for i in range(options.jobs):
85 - t = ThreadFetch(fetch_queue, updated_repos, options.packagesdir, options.depth)
89 - signal.signal(signal.SIGINT, signal.SIG_DFL)
91 refs = getrefs(options.branch, options.repopattern)
92 print('Read remotes data')
94 for pkgdir in sorted(refs.heads):
95 gitdir = os.path.join(options.packagesdir, pkgdir, '.git')
96 if not os.path.isdir(gitdir):
97 @@ -143,9 +126,18 @@ def fetch_packages(options, return_all=F
98 ref2fetch.append('+{}:{}/{}'.format(ref, REMOTEREFS, ref[len('refs/heads/'):]))
100 ref2fetch.append('refs/notes/*:refs/notes/*')
101 - fetch_queue.put((gitrepo, ref2fetch))
102 + args.append((gitrepo, ref2fetch, options))
104 + pool = WorkerPool(options.jobs, pool_worker_init)
106 + updated_repos = pool.starmap(fetch_package, args)
107 + except KeyboardInterrupt:
114 + updated_repos = list(filter(None, updated_repos))
118 @@ -158,26 +150,60 @@ def fetch_packages(options, return_all=F
122 - return updated_repos.items
123 + return updated_repos
125 +def checkout_package(repo, options):
127 + repo.checkout(options.checkout)
128 + except GitRepoError as e:
129 + print('Problem with checking branch {} in repo {}: {}'.format(options.checkout, repo.gdir, e), file=sys.stderr)
131 def checkout_packages(options):
132 if options.checkout is None:
133 options.checkout = "/".join([REMOTE_NAME, options.branch[0]])
134 fetch_packages(options)
135 refs = getrefs(options.branch, options.repopattern)
137 for pkgdir in sorted(refs.heads):
138 - repo = GitRepo(os.path.join(options.packagesdir, pkgdir))
140 - repo.checkout(options.checkout)
141 - except GitRepoError as e:
142 - print('Problem with checking branch {} in repo {}: {}'.format(options.checkout, repo.gdir, e), file=sys.stderr)
143 + repos.append(GitRepo(os.path.join(options.packagesdir, pkgdir)))
144 + pool = WorkerPool(options.jobs)
146 + pool.starmap(checkout_package, zip(repos, [options] * len(repos)))
147 + except KeyboardInterrupt:
153 +def clone_package(repo, options):
155 + repo.checkout('master')
156 + except GitRepoError as e:
157 + print('Problem with checking branch master in repo {}: {}'.format(repo.gdir, e), file=sys.stderr)
159 def clone_packages(options):
160 - for repo in fetch_packages(options):
162 - repo.checkout('master')
163 - except GitRepoError as e:
164 - print('Problem with checking branch master in repo {}: {}'.format(repo.gdir, e), file=sys.stderr)
165 + repos = fetch_packages(options)
166 + pool = WorkerPool(options.jobs)
168 + pool.starmap(clone_package, zip(repos, [options] * len(repos)))
169 + except KeyboardInterrupt:
175 +def pull_package(gitrepo, options):
176 + directory = os.path.basename(gitrepo.wtree)
178 + (out, err) = gitrepo.commandexc(['rev-parse', '-q', '--verify', '@{u}'])
179 + sha1 = out.decode().strip()
180 + (out, err) = gitrepo.commandexc(['rebase', sha1])
181 + for line in out.decode().splitlines():
182 + print(directory,":",line)
183 + except GitRepoError as e:
184 + for line in e.args[0].splitlines():
185 + print("{}: {}".format(directory,line))
188 def pull_packages(options):
190 @@ -189,19 +215,14 @@ def pull_packages(options):
192 repolist = fetch_packages(options, False)
193 print('--------Pulling------------')
194 - for gitrepo in repolist:
195 - directory = os.path.basename(gitrepo.wtree)
197 - (out, err) = gitrepo.commandexc(['rev-parse', '-q', '--verify', '@{u}'])
198 - sha1 = out.decode().strip()
199 - (out, err) = gitrepo.commandexc(['rebase', sha1])
200 - for line in out.decode().splitlines():
201 - print(directory,":",line)
202 - except GitRepoError as e:
203 - for line in e.args[0].splitlines():
204 - print("{}: {}".format(directory,line))
207 + pool = WorkerPool(options.jobs, pool_worker_init)
209 + pool.starmap(pull_package, zip(repolist, [options] * len(repolist)))
210 + except KeyboardInterrupt:
216 def list_packages(options):
217 refs = getrefs(options.branch, options.repopattern)
218 @@ -213,7 +234,7 @@ common_options.add_argument('-d', '--pac
219 default=os.path.expanduser('~/rpm/packages'))
221 common_fetchoptions = argparse.ArgumentParser(add_help=False, parents=[common_options])
222 -common_fetchoptions.add_argument('-j', '--jobs', help='number of threads to use', default=4, type=int)
223 +common_fetchoptions.add_argument('-j', '--jobs', help='number of threads to use', default=cpu_count(), type=int)
224 common_fetchoptions.add_argument('repopattern', nargs='*', default = ['*'])
225 common_fetchoptions.add_argument('--depth', help='depth of fetch', default=0)