2 Code for running GnuPG from Python and dealing with the results.
4 Detailed info about the format of data to/from gpg may be obtained from the
5 file DETAILS in the gnupg source.
10 import os, StringIO, popen2
13 "Used to hold information about a signature result"
17 self.fingerprint = self.creation_date = self.timestamp = None
18 self.signature_id = self.key_id = None
21 def BADSIG(self, value):
23 self.key_id, self.username = value.split(None, 1)
24 def GOODSIG(self, value):
26 self.key_id, self.username = value.split(None, 1)
27 def VALIDSIG(self, value):
28 self.fingerprint, self.creation_date, self.timestamp = value.split()
29 def SIG_ID(self, value):
30 self.signature_id, self.creation_date, self.timestamp = value.split()
36 "Used to hold information about a key import result"
38 counts = '''count no_user_id imported imported_rsa unchanged
39 n_uids n_subk n_sigs n_revoc sec_read sec_imported
40 sec_dups not_imported'''.split()
44 for result in self.counts:
45 setattr(self, result, None)
47 def NODATA(self, value):
48 self.results.append({'fingerprint': None,
49 'problem': '0', 'text': 'No valid data found'})
50 def IMPORTED(self, value):
51 # this duplicates info we already see in import_ok and import_problem
54 '0': 'Not actually changed',
55 '1': 'Entirely new key',
57 '4': 'New signatures',
59 '16': 'Contains private key',
61 def IMPORT_OK(self, value):
62 reason, fingerprint = value.split()
63 self.results.append({'fingerprint': fingerprint,
64 'ok': reason, 'text': self.ok_reason[reason]})
66 '0': 'No specific reason given',
67 '1': 'Invalid Certificate',
68 '2': 'Issuer Certificate missing',
69 '3': 'Certificate Chain too long',
70 '4': 'Error storing certificate',
72 def IMPORT_PROBLEM(self, value):
74 reason, fingerprint = value.split()
77 fingerprint = '<unknown>'
78 self.results.append({'fingerprint': fingerprint,
79 'problem': reason, 'text': self.problem_reason[reason]})
80 def IMPORT_RES(self, value):
81 import_res = value.split()
82 for i in range(len(self.counts)):
83 setattr(self, self.counts[i], int(import_res[i]))
87 l.append('%d imported'%self.imported)
89 l.append('%d not imported'%self.not_imported)
93 ''' Parse a --list-keys output
95 Handle pub and uid (relating the latter to the former).
97 Don't care about (info from src/DETAILS):
99 crt = X.509 certificate
100 crs = X.509 certificate and private key available
101 sub = subkey (secondary key)
103 ssb = secret subkey (secondary key)
104 uat = user attribute (same as user id except for field 10).
106 rev = revocation signature
107 fpr = fingerprint: (fingerprint is in field 10)
108 pkd = public key data (special field format, see below)
109 grp = reserved for gpgsm
120 self.pk = {'keyid': keyid, 'date': date, 'uids': [uid]}
121 self.pub_keys.append(self.pk)
124 self.pk['uids'].append(args[9])
126 class EncryptedMessage:
127 ''' Handle a --encrypt command
132 def BEGIN_ENCRYPTION(self, value):
134 def END_ENCRYPTION(self, value):
137 class DecryptedMessage(object):
138 ''' Handle a --decrypt command
143 def DECRYPTION_FAILED(self,value):
144 raise ValueError, "Decryption failed"
146 def empty(self,name):
149 def __getattr__(self,name):
151 return object.__getattr__(self,name)
152 except AttributeError:
157 # Default path used for searching for the GPG binary, when the
158 # PATH environment variable isn't set.
159 DEFAULT_PATH = ['/bin', '/usr/bin', '/usr/local/bin']
161 def __init__(self, gpg_binary=None, keyring=None):
162 """Initialize an object instance. Options are:
164 gpg_binary -- full pathname for GPG binary. If not supplied,
165 the current value of PATH will be searched, falling back to the
166 DEFAULT_PATH class variable if PATH isn't available.
168 keyring -- full pathname to the public keyring to use in place of
169 the default "~/.gnupg/pubring.gpg".
171 # If needed, look for the gpg binary along the path
172 if gpg_binary is None:
173 if os.environ.has_key('PATH'):
174 path = os.environ['PATH']
175 path = path.split(os.pathsep)
177 path = self.DEFAULT_PATH
180 fullname = os.path.join(dir, 'gpg')
181 if os.path.exists( fullname ):
182 gpg_binary = fullname
185 raise ValueError, ("Couldn't find 'gpg' binary on path"
188 self.gpg_binary = gpg_binary
189 self.keyring = keyring
191 def _open_subprocess(self, *args):
192 # Internal method: open a pipe to a GPG subprocess and return
193 # the file objects for communicating with it.
194 cmd = [self.gpg_binary, '--status-fd 2']
196 cmd.append('--keyring "%s" --no-default-keyring'%self.keyring)
201 child_stdout, child_stdin, child_stderr = popen2.popen3(cmd)
202 return child_stdout, child_stdin, child_stderr
204 def _read_response(self, child_stdout, response):
205 # Internal method: reads all the output from GPG, taking notice
206 # only of lines that begin with the magic [GNUPG:] prefix.
208 # Calls methods on the response object for each valid token found,
209 # with the arg being the remainder of the status line.
211 line = child_stdout.readline()
214 if line[0:9] == '[GNUPG:] ':
215 # Chop off the prefix
217 L = line.split(None, 1)
223 getattr(response, keyword)(value)
225 def _handle_gigo(self, args, file, result):
226 # Handle a basic data call - pass data to GPG, handle the output
227 # including status information. Garbage In, Garbage Out :)
228 child_stdout, child_stdin, child_stderr = self._open_subprocess(*args)
230 # Copy the file to the GPG subprocess
232 data = file.read(1024)
234 child_stdin.write(data)
237 # Get the response information
238 resp = self._read_response(child_stderr, result)
240 # Read the contents of the file from GPG's stdout
243 data = child_stdout.read(1024)
245 result.data = result.data + data
251 # SIGNATURE VERIFICATION METHODS
253 def verify(self, data):
254 "Verify the signature on the contents of the string 'data'"
255 file = StringIO.StringIO(data)
256 return self.verify_file(file)
258 def verify_file(self, file):
259 "Verify the signature on the contents of the file-like object 'file'"
261 self._handle_gigo([], file, sig)
267 def import_key(self, key_data):
268 ''' import the key_data into our keyring '''
269 child_stdout, child_stdin, child_stderr = \
270 self._open_subprocess('--import')
272 child_stdin.write(key_data)
275 # Get the response information
276 result = ImportResult()
277 resp = self._read_response(child_stderr, result)
282 ''' list the keys currently in the keyring '''
283 child_stdout, child_stdin, child_stderr = \
284 self._open_subprocess('--list-keys --with-colons')
287 # TODO: there might be some status thingumy here I should handle...
289 # Get the response information
290 result = ListResult()
291 valid_keywords = 'pub uid'.split()
293 line = child_stdout.readline()
296 L = line.strip().split(':')
300 if keyword in valid_keywords:
301 getattr(result, keyword)(L)
308 def encrypt_file(self, file, recipients):
309 "Encrypt the message read from the file-like object 'file'"
310 args = ['--encrypt --armor']
311 for recipient in recipients:
312 args.append('--recipient %s'%recipient)
313 result = EncryptedMessage()
314 self._handle_gigo(args, file, result)
317 def encrypt(self, data, recipients):
318 "Encrypt the message contained in the string 'data'"
319 file = StringIO.StringIO(data)
320 return self.encrypt_file(file, recipients)
323 # Not yet implemented, because I don't need these methods
324 # The methods certainly don't have all the parameters they'd need.
325 def sign(self, data):
326 "Sign the contents of the string 'data'"
329 def sign_file(self, file):
330 "Sign the contents of the file-like object 'file'"
333 def decrypt_file(self, file):
334 "Decrypt the message read from the file-like object 'file'"
335 args = ['--decrypt --armor']
336 result = DecryptedMessage()
337 self._handle_gigo(args, file, result)
340 def decrypt(self, data):
341 "Decrypt the message contained in the string 'data'"
342 file = StringIO.StringIO(data)
343 return self.decrypt_file(file)
345 if __name__ == '__main__':
347 if len(sys.argv) == 1:
348 print 'Usage: GPG.py <signed file>'
351 obj = GPGSubprocess()
352 file = open(sys.argv[1], 'rb')
353 sig = obj.verify_file( file )