]> git.pld-linux.org Git - packages/git-core-slug.git/blob - git-core-slug-parallel.patch
d6266c8f2c774b2df0f95b9b1c102bf4266efba3
[packages/git-core-slug.git] / git-core-slug-parallel.patch
1 --- a/slug.py   2014-10-19 18:07:38.000000000 +0200
2 +++ b/slug.py   2014-11-20 22:31:39.005919529 +0100
3 @@ -7,26 +7,18 @@ import os
4  import shutil
5  import subprocess
6  import queue
7 -import threading
8 -
9 +import multiprocessing
10  import argparse
11  
12  import signal
13  import configparser
14  
15 +from multiprocessing import Pool as WorkerPool
16 +
17  from git_slug.gitconst import GITLOGIN, GITSERVER, GIT_REPO, GIT_REPO_PUSH, REMOTE_NAME, REMOTEREFS
18  from git_slug.gitrepo import GitRepo, GitRepoError
19  from git_slug.refsdata import GitArchiveRefsData, NoMatchedRepos, RemoteRefsError
20  
21 -class Store():
22 -    def __init__(self):
23 -        self.lock = threading.Lock()
24 -        self.items = []
25 -
26 -    def put(self, item):
27 -        with self.lock:
28 -            self.items.append(item)
29 -
30  class UnquoteConfig(configparser.ConfigParser):
31      def get(self, section, option, **kwargs):
32          value = super().get(section, option, **kwargs)
33 @@ -43,25 +35,15 @@ class DelAppend(argparse._AppendAction):
34          item.append(values)
35          setattr(namespace, self.dest, item)
36  
37 -class ThreadFetch(threading.Thread):
38 -    def __init__(self, queue, output, pkgdir, depth=0):
39 -        threading.Thread.__init__(self)
40 -        self.queue = queue
41 -        self.packagesdir = pkgdir
42 -        self.depth = depth
43 -        self.output = output
44 -
45 -    def run(self):
46 -        while True:
47 -            (gitrepo, ref2fetch) = self.queue.get()
48 -            try:
49 -                (stdout, stderr) = gitrepo.fetch(ref2fetch, self.depth)
50 -                if stderr != b'':
51 -                    print('------', gitrepo.gdir[:-len('.git')], '------\n' + stderr.decode('utf-8'))
52 -                    self.output.put(gitrepo)
53 -            except GitRepoError as e:
54 -                print('------', gitrepo.gdir[:-len('.git')], '------\n', e)
55 -            self.queue.task_done()
56 +def cpu_count():
57 +    try:
58 +        return multiprocessing.cpu_count()
59 +    except NotImplementedError:
60 +        pass
61 +    return 4
62 +
63 +def pool_worker_init():
64 +    signal.signal(signal.SIGINT, signal.SIG_IGN)
65  
66  def readconfig(path):
67      config = UnquoteConfig(delimiters='=', interpolation=None, strict=False)
68 @@ -114,18 +96,19 @@ def getrefs(*args):
69          sys.exit(2)
70      return refs
71  
72 +def fetch_package(gitrepo, ref2fetch, options):
73 +    try:
74 +        (stdout, stderr) = gitrepo.fetch(ref2fetch, options.depth)
75 +        if stderr != b'':
76 +            print('------', gitrepo.gdir[:-len('.git')], '------\n' + stderr.decode('utf-8'))
77 +            return gitrepo
78 +    except GitRepoError as e:
79 +        print('------', gitrepo.gdir[:-len('.git')], '------\n', e)
80 +         
81  def fetch_packages(options, return_all=False):
82 -    fetch_queue = queue.Queue()
83 -    updated_repos = Store()
84 -    for i in range(options.jobs):
85 -        t = ThreadFetch(fetch_queue, updated_repos, options.packagesdir, options.depth)
86 -        t.setDaemon(True)
87 -        t.start()
88 -
89 -    signal.signal(signal.SIGINT, signal.SIG_DFL)
90 -
91      refs = getrefs(options.branch, options.repopattern)
92      print('Read remotes data')
93 +    args = []
94      for pkgdir in sorted(refs.heads):
95          gitdir = os.path.join(options.packagesdir, pkgdir, '.git')
96          if not os.path.isdir(gitdir):
97 @@ -143,9 +126,18 @@ def fetch_packages(options, return_all=F
98                  ref2fetch.append('+{}:{}/{}'.format(ref, REMOTEREFS, ref[len('refs/heads/'):]))
99          if ref2fetch:
100              ref2fetch.append('refs/notes/*:refs/notes/*')
101 -            fetch_queue.put((gitrepo, ref2fetch))
102 +            args.append((gitrepo, ref2fetch, options))
103 +
104 +    pool = WorkerPool(options.jobs, pool_worker_init)
105 +    try:
106 +        updated_repos = pool.starmap(fetch_package, args)
107 +    except KeyboardInterrupt:
108 +        pool.terminate()
109 +    else:
110 +        pool.close()
111 +    pool.join()
112  
113 -    fetch_queue.join()
114 +    updated_repos = list(filter(None, updated_repos))
115  
116      if options.prune:
117          refs = getrefs('*')
118 @@ -158,26 +150,60 @@ def fetch_packages(options, return_all=F
119      if return_all:
120          return refs.heads
121      else:
122 -        return updated_repos.items
123 +        return updated_repos
124 +
125 +def checkout_package(repo, options):
126 +    try:
127 +        repo.checkout(options.checkout)
128 +    except GitRepoError as e:
129 +        print('Problem with checking branch {} in repo {}: {}'.format(options.checkout, repo.gdir, e), file=sys.stderr)
130  
131  def checkout_packages(options):
132      if options.checkout is None:
133          options.checkout = "/".join([REMOTE_NAME, options.branch[0]])
134      fetch_packages(options)
135      refs = getrefs(options.branch, options.repopattern)
136 +    repos = []
137      for pkgdir in sorted(refs.heads):
138 -        repo = GitRepo(os.path.join(options.packagesdir, pkgdir))
139 -        try:
140 -            repo.checkout(options.checkout)
141 -        except GitRepoError as e:
142 -            print('Problem with checking branch {} in repo {}: {}'.format(options.checkout, repo.gdir, e), file=sys.stderr)
143 +        repos.append(GitRepo(os.path.join(options.packagesdir, pkgdir)))
144 +    pool = WorkerPool(options.jobs)
145 +    try:
146 +        pool.starmap(checkout_package, zip(repos, [options] * len(repos)))
147 +    except KeyboardInterrupt:
148 +        pool.terminate()
149 +    else:
150 +        pool.close()
151 +    pool.join()
152 +
153 +def clone_package(repo, options):
154 +    try:
155 +        repo.checkout('master')
156 +    except GitRepoError as e:
157 +        print('Problem with checking branch master in repo {}: {}'.format(repo.gdir, e), file=sys.stderr)
158  
159  def clone_packages(options):
160 -    for repo in fetch_packages(options):
161 -        try:
162 -            repo.checkout('master')
163 -        except GitRepoError as e:
164 -            print('Problem with checking branch master in repo {}: {}'.format(repo.gdir, e), file=sys.stderr)
165 +    repos = fetch_packages(options)
166 +    pool = WorkerPool(options.jobs)
167 +    try:
168 +        pool.starmap(clone_package, zip(repos, [options] * len(repos)))
169 +    except KeyboardInterrupt:
170 +        pool.terminate()
171 +    else:
172 +        pool.close()
173 +    pool.join()
174 +
175 +def pull_package(gitrepo, options):
176 +    directory = os.path.basename(gitrepo.wtree)
177 +    try:
178 +        (out, err) = gitrepo.commandexc(['rev-parse', '-q', '--verify', '@{u}'])
179 +        sha1 = out.decode().strip()
180 +        (out, err) = gitrepo.commandexc(['rebase', sha1])
181 +        for line in out.decode().splitlines():
182 +            print(directory,":",line)
183 +    except GitRepoError as e:
184 +        for line in e.args[0].splitlines():
185 +            print("{}: {}".format(directory,line))
186 +        pass
187  
188  def pull_packages(options):
189      repolist = []
190 @@ -189,19 +215,14 @@ def pull_packages(options):
191      else:
192          repolist = fetch_packages(options, False)
193      print('--------Pulling------------')
194 -    for gitrepo in repolist:
195 -        directory = os.path.basename(gitrepo.wtree)
196 -        try:
197 -            (out, err) = gitrepo.commandexc(['rev-parse', '-q', '--verify', '@{u}'])
198 -            sha1 = out.decode().strip()
199 -            (out, err) = gitrepo.commandexc(['rebase', sha1])
200 -            for line in out.decode().splitlines():
201 -                print(directory,":",line)
202 -        except GitRepoError as e:
203 -            for line in e.args[0].splitlines():
204 -                print("{}: {}".format(directory,line))
205 -            pass
206 -
207 +    pool = WorkerPool(options.jobs, pool_worker_init)
208 +    try:
209 +        pool.starmap(pull_package, zip(repolist, [options] * len(repolist)))
210 +    except KeyboardInterrupt:
211 +        pool.terminate()
212 +    else:
213 +        pool.close()
214 +    pool.join()
215  
216  def list_packages(options):
217      refs = getrefs(options.branch, options.repopattern)
218 @@ -213,7 +234,7 @@ common_options.add_argument('-d', '--pac
219      default=os.path.expanduser('~/rpm/packages'))
220  
221  common_fetchoptions = argparse.ArgumentParser(add_help=False, parents=[common_options])
222 -common_fetchoptions.add_argument('-j', '--jobs', help='number of threads to use', default=4, type=int)
223 +common_fetchoptions.add_argument('-j', '--jobs', help='number of threads to use', default=cpu_count(), type=int)
224  common_fetchoptions.add_argument('repopattern', nargs='*', default = ['*'])
225  common_fetchoptions.add_argument('--depth', help='depth of fetch', default=0)
226  
This page took 0.05423 seconds and 2 git commands to generate.