]> git.pld-linux.org Git - packages/python-gpg.git/blob - GPG.py
- drop md5sum, the file is in git
[packages/python-gpg.git] / GPG.py
1 '''
2 Code for running GnuPG from Python and dealing with the results.
3
4 Detailed info about the format of data to/from gpg may be obtained from the
5 file DETAILS in the gnupg source.
6 '''
7
8 __rcsid__ = '$Id$'
9
10 import os, StringIO, popen2
11
12 class Signature:
13     "Used to hold information about a signature result"
14
15     def __init__(self):
16         self.valid = 0
17         self.fingerprint = self.creation_date = self.timestamp = None
18         self.signature_id = self.key_id = None
19         self.username = None
20
21     def BADSIG(self, value):
22         self.valid = 0
23         self.key_id, self.username = value.split(None, 1)
24     def GOODSIG(self, value):
25         self.valid = 1
26         self.key_id, self.username = value.split(None, 1)
27     def VALIDSIG(self, value):
28         self.fingerprint, self.creation_date, self.timestamp = value.split()
29     def SIG_ID(self, value):
30         self.signature_id, self.creation_date, self.timestamp = value.split()
31
32     def is_valid(self):
33         return self.valid
34  
35 class ImportResult:
36     "Used to hold information about a key import result"
37
38     counts = '''count no_user_id imported imported_rsa unchanged
39             n_uids n_subk n_sigs n_revoc sec_read sec_imported
40             sec_dups not_imported'''.split()
41     def __init__(self):
42         self.imported = []
43         self.results = []
44         for result in self.counts:
45             setattr(self, result, None)
46     
47     def NODATA(self, value):
48         self.results.append({'fingerprint': None,
49             'problem': '0', 'text': 'No valid data found'})
50     def IMPORTED(self, value):
51         # this duplicates info we already see in import_ok and import_problem
52         pass
53     ok_reason = {
54         '0': 'Not actually changed',
55         '1': 'Entirely new key',
56         '2': 'New user IDs',
57         '4': 'New signatures',
58         '8': 'New subkeys',
59         '16': 'Contains private key',
60     }
61     def IMPORT_OK(self, value):
62         reason, fingerprint = value.split()
63         self.results.append({'fingerprint': fingerprint,
64             'ok': reason, 'text': self.ok_reason[reason]})
65     problem_reason = {
66         '0': 'No specific reason given',
67         '1': 'Invalid Certificate',
68         '2': 'Issuer Certificate missing',
69         '3': 'Certificate Chain too long',
70         '4': 'Error storing certificate',
71     }
72     def IMPORT_PROBLEM(self, value):
73         try:
74             reason, fingerprint = value.split()
75         except:
76             reason = value
77             fingerprint = '<unknown>'
78         self.results.append({'fingerprint': fingerprint,
79             'problem': reason, 'text': self.problem_reason[reason]})
80     def IMPORT_RES(self, value):
81         import_res = value.split()
82         for i in range(len(self.counts)):
83             setattr(self, self.counts[i], int(import_res[i]))
84
85     def summary(self):
86         l = []
87         l.append('%d imported'%self.imported)
88         if self.not_imported:
89             l.append('%d not imported'%self.not_imported)
90         return ', '.join(l)
91
92 class ListResult:
93     ''' Parse a --list-keys output
94
95         Handle pub and uid (relating the latter to the former).
96
97         Don't care about (info from src/DETAILS):
98
99         crt = X.509 certificate
100         crs = X.509 certificate and private key available
101         sub = subkey (secondary key)
102         sec = secret key
103         ssb = secret subkey (secondary key)
104         uat = user attribute (same as user id except for field 10).
105         sig = signature
106         rev = revocation signature
107         fpr = fingerprint: (fingerprint is in field 10)
108         pkd = public key data (special field format, see below)
109         grp = reserved for gpgsm
110         rvk = revocation key
111     '''
112     def __init__(self):
113         self.pub_keys = []
114         self.pk = None
115
116     def pub(self, args):
117         keyid = args[4]
118         date = args[5]
119         uid = args[9]
120         self.pk = {'keyid': keyid, 'date': date, 'uids': [uid]}
121         self.pub_keys.append(self.pk)
122
123     def uid(self, args):
124         self.pk['uids'].append(args[9])
125
126 class EncryptedMessage:
127     ''' Handle a --encrypt command
128     '''
129     def __init__(self):
130         self.data = ''
131
132     def BEGIN_ENCRYPTION(self, value):
133         pass
134     def END_ENCRYPTION(self, value):
135         pass
136
137 class DecryptedMessage(object):
138     ''' Handle a --decrypt command
139     '''
140     def __init__(self):
141         self.data = ''
142
143     def DECRYPTION_FAILED(self,value):
144         raise ValueError, "Decryption failed"
145
146     def empty(self,name):
147         pass
148
149     def __getattr__(self,name):
150         try:
151             return object.__getattr__(self,name)
152         except AttributeError:
153             return self.empty
154
155 class GPGSubprocess:
156
157     # Default path used for searching for the GPG binary, when the
158     # PATH environment variable isn't set.
159     DEFAULT_PATH = ['/bin', '/usr/bin', '/usr/local/bin']
160     
161     def __init__(self, gpg_binary=None, keyring=None):
162         """Initialize an object instance.  Options are:
163
164         gpg_binary -- full pathname for GPG binary.  If not supplied,
165         the current value of PATH will be searched, falling back to the
166         DEFAULT_PATH class variable if PATH isn't available.
167
168         keyring -- full pathname to the public keyring to use in place of
169         the default "~/.gnupg/pubring.gpg".
170         """
171         # If needed, look for the gpg binary along the path
172         if gpg_binary is None:
173             if os.environ.has_key('PATH'):
174                 path = os.environ['PATH']
175                 path = path.split(os.pathsep)
176             else:
177                 path = self.DEFAULT_PATH
178
179             for dir in path:
180                 fullname = os.path.join(dir, 'gpg')
181                 if os.path.exists( fullname ):
182                     gpg_binary = fullname
183                     break
184             else:
185                 raise ValueError, ("Couldn't find 'gpg' binary on path"
186                                    + repr(path) )
187             
188         self.gpg_binary = gpg_binary
189         self.keyring = keyring
190
191     def _open_subprocess(self, *args):
192         # Internal method: open a pipe to a GPG subprocess and return
193         # the file objects for communicating with it.
194         cmd = [self.gpg_binary, '--status-fd 2']
195         if self.keyring:
196             cmd.append('--keyring "%s" --no-default-keyring'%self.keyring)
197
198         cmd.extend(args)
199         cmd = ' '.join(cmd)
200
201         child_stdout, child_stdin, child_stderr = popen2.popen3(cmd)
202         return child_stdout, child_stdin, child_stderr
203
204     def _read_response(self, child_stdout, response):
205         # Internal method: reads all the output from GPG, taking notice
206         # only of lines that begin with the magic [GNUPG:] prefix.
207         # 
208         # Calls methods on the response object for each valid token found,
209         # with the arg being the remainder of the status line.
210         while 1:
211             line = child_stdout.readline()
212             if line == "": break
213             line = line.rstrip()
214             if line[0:9] == '[GNUPG:] ':
215                 # Chop off the prefix
216                 line = line[9:]
217                 L = line.split(None, 1)
218                 keyword = L[0]
219                 if len(L) > 1:
220                     value = L[1]
221                 else:
222                     value = ""
223                 getattr(response, keyword)(value)
224
225     def _handle_gigo(self, args, file, result):
226         # Handle a basic data call - pass data to GPG, handle the output
227         # including status information. Garbage In, Garbage Out :)
228         child_stdout, child_stdin, child_stderr = self._open_subprocess(*args)
229
230         # Copy the file to the GPG subprocess
231         while 1:
232             data = file.read(1024)
233             if data == "": break
234             child_stdin.write(data)
235         child_stdin.close()
236
237         # Get the response information
238         resp = self._read_response(child_stderr, result)
239
240         # Read the contents of the file from GPG's stdout
241         result.data = ""
242         while 1:
243             data = child_stdout.read(1024)
244             if data == "": break
245             result.data = result.data + data
246
247         return result
248     
249
250     #
251     # SIGNATURE VERIFICATION METHODS
252     #
253     def verify(self, data):
254         "Verify the signature on the contents of the string 'data'"
255         file = StringIO.StringIO(data)
256         return self.verify_file(file)
257     
258     def verify_file(self, file):
259         "Verify the signature on the contents of the file-like object 'file'"
260         sig = Signature()
261         self._handle_gigo([], file, sig)
262         return sig
263
264     #
265     # KEY MANAGEMENT
266     #
267     def import_key(self, key_data):
268         ''' import the key_data into our keyring '''
269         child_stdout, child_stdin, child_stderr = \
270             self._open_subprocess('--import')
271
272         child_stdin.write(key_data)
273         child_stdin.close()
274
275         # Get the response information
276         result = ImportResult()
277         resp = self._read_response(child_stderr, result)
278
279         return result
280
281     def list_keys(self):
282         ''' list the keys currently in the keyring '''
283         child_stdout, child_stdin, child_stderr = \
284             self._open_subprocess('--list-keys --with-colons')
285         child_stdin.close()
286
287         # TODO: there might be some status thingumy here I should handle...
288
289         # Get the response information
290         result = ListResult()
291         valid_keywords = 'pub uid'.split()
292         while 1:
293             line = child_stdout.readline()
294             if not line:
295                 break
296             L = line.strip().split(':')
297             if not L:
298                 continue
299             keyword = L[0]
300             if keyword in valid_keywords:
301                 getattr(result, keyword)(L)
302
303         return result
304
305     #
306     # ENCRYPTING DATA
307     #
308     def encrypt_file(self, file, recipients):
309         "Encrypt the message read from the file-like object 'file'"
310         args = ['--encrypt --armor']
311         for recipient in recipients:
312             args.append('--recipient %s'%recipient)
313         result = EncryptedMessage()
314         self._handle_gigo(args, file, result)
315         return result
316
317     def encrypt(self, data, recipients):
318         "Encrypt the message contained in the string 'data'"
319         file = StringIO.StringIO(data)
320         return self.encrypt_file(file, recipients)
321
322
323     # Not yet implemented, because I don't need these methods
324     # The methods certainly don't have all the parameters they'd need.
325     def sign(self, data):
326         "Sign the contents of the string 'data'"
327         pass
328
329     def sign_file(self, file):
330         "Sign the contents of the file-like object 'file'"
331         pass
332
333     def decrypt_file(self, file):
334         "Decrypt the message read from the file-like object 'file'"
335         args = ['--decrypt --armor']
336         result = DecryptedMessage()
337         self._handle_gigo(args, file, result)
338         return result
339
340     def decrypt(self, data):
341         "Decrypt the message contained in the string 'data'"
342         file = StringIO.StringIO(data)
343         return self.decrypt_file(file)
344     
345 if __name__ == '__main__':
346     import sys
347     if len(sys.argv) == 1:
348         print 'Usage: GPG.py <signed file>'
349         sys.exit()
350
351     obj = GPGSubprocess()
352     file = open(sys.argv[1], 'rb')
353     sig = obj.verify_file( file )
354     print sig.__dict__
This page took 0.12989 seconds and 3 git commands to generate.