1
2
3
4 """
5 This file is part of the web2py Web Framework
6 Copyrighted by Massimo Di Pierro <mdipierro@cs.depaul.edu>
7 License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html)
8
9 This file specifically includes utilities for security.
10 """
11
12 import threading
13 import struct
14 import hashlib
15 import hmac
16 import uuid
17 import random
18 import time
19 import os
20 import re
21 import sys
22 import logging
23 import socket
24 import base64
25 import zlib
26
27 python_version = sys.version_info[0]
28
29 if python_version == 2:
30 import cPickle as pickle
31 else:
32 import pickle
33
34
35 try:
36 from Crypto.Cipher import AES
37 except ImportError:
38 import contrib.aes as AES
39
40 try:
41 from contrib.pbkdf2 import pbkdf2_hex
42 HAVE_PBKDF2 = True
43 except ImportError:
44 try:
45 from .pbkdf2 import pbkdf2_hex
46 HAVE_PBKDF2 = True
47 except (ImportError, ValueError):
48 HAVE_PBKDF2 = False
49
50 logger = logging.getLogger("web2py")
51
53 """ Returns an AES cipher object and random IV if None specified """
54 if IV is None:
55 IV = fast_urandom16()
56
57 return AES.new(key, AES.MODE_CBC, IV), IV
58
59
61 """ compares two strings and not vulnerable to timing attacks """
62 if len(a) != len(b):
63 return False
64 result = 0
65 for x, y in zip(a, b):
66 result |= ord(x) ^ ord(y)
67 return result == 0
68
69
71 """ Generate a md5 hash with the given text """
72 return hashlib.md5(text).hexdigest()
73
74
75 -def simple_hash(text, key='', salt='', digest_alg='md5'):
76 """
77 Generates hash with the given text using the specified
78 digest hashing algorithm
79 """
80 if not digest_alg:
81 raise RuntimeError("simple_hash with digest_alg=None")
82 elif not isinstance(digest_alg, str):
83 h = digest_alg(text + key + salt)
84 elif digest_alg.startswith('pbkdf2'):
85 iterations, keylen, alg = digest_alg[7:-1].split(',')
86 return pbkdf2_hex(text, salt, int(iterations),
87 int(keylen), get_digest(alg))
88 elif key:
89 digest_alg = get_digest(digest_alg)
90 h = hmac.new(key + salt, text, digest_alg)
91 else:
92 h = hashlib.new(digest_alg)
93 h.update(text + salt)
94 return h.hexdigest()
95
96
98 """
99 Returns a hashlib digest algorithm from a string
100 """
101 if not isinstance(value, str):
102 return value
103 value = value.lower()
104 if value == "md5":
105 return hashlib.md5
106 elif value == "sha1":
107 return hashlib.sha1
108 elif value == "sha224":
109 return hashlib.sha224
110 elif value == "sha256":
111 return hashlib.sha256
112 elif value == "sha384":
113 return hashlib.sha384
114 elif value == "sha512":
115 return hashlib.sha512
116 else:
117 raise ValueError("Invalid digest algorithm: %s" % value)
118
119 DIGEST_ALG_BY_SIZE = {
120 128 / 4: 'md5',
121 160 / 4: 'sha1',
122 224 / 4: 'sha224',
123 256 / 4: 'sha256',
124 384 / 4: 'sha384',
125 512 / 4: 'sha512',
126 }
127
128
129 -def pad(s, n=32, padchar=' '):
130 return s + (32 - len(s) % 32) * padchar
131
132
133 -def secure_dumps(data, encryption_key, hash_key=None, compression_level=None):
134 if not hash_key:
135 hash_key = hashlib.sha1(encryption_key).hexdigest()
136 dump = pickle.dumps(data)
137 if compression_level:
138 dump = zlib.compress(dump, compression_level)
139 key = pad(encryption_key[:32])
140 cipher, IV = AES_new(key)
141 encrypted_data = base64.urlsafe_b64encode(IV + cipher.encrypt(pad(dump)))
142 signature = hmac.new(hash_key, encrypted_data).hexdigest()
143 return signature + ':' + encrypted_data
144
145
146 -def secure_loads(data, encryption_key, hash_key=None, compression_level=None):
147 if not ':' in data:
148 return None
149 if not hash_key:
150 hash_key = hashlib.sha1(encryption_key).hexdigest()
151 signature, encrypted_data = data.split(':', 1)
152 actual_signature = hmac.new(hash_key, encrypted_data).hexdigest()
153 if not compare(signature, actual_signature):
154 return None
155 key = pad(encryption_key[:32])
156 encrypted_data = base64.urlsafe_b64decode(encrypted_data)
157 IV, encrypted_data = encrypted_data[:16], encrypted_data[16:]
158 cipher, _ = AES_new(key, IV=IV)
159 try:
160 data = cipher.decrypt(encrypted_data)
161 data = data.rstrip(' ')
162 if compression_level:
163 data = zlib.decompress(data)
164 return pickle.loads(data)
165 except (TypeError, pickle.UnpicklingError):
166 return None
167
168
169
170
172 """
173 This function and the web2py_uuid follow from the following discussion:
174 http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09
175
176 At startup web2py compute a unique ID that identifies the machine by adding
177 uuid.getnode() + int(time.time() * 1e3)
178
179 This is a 48-bit number. It converts the number into 16 8-bit tokens.
180 It uses this value to initialize the entropy source ('/dev/urandom') and to seed random.
181
182 If os.random() is not supported, it falls back to using random and issues a warning.
183 """
184 node_id = uuid.getnode()
185 microseconds = int(time.time() * 1e6)
186 ctokens = [((node_id + microseconds) >> ((i % 6) * 8)) %
187 256 for i in range(16)]
188 random.seed(node_id + microseconds)
189 try:
190 os.urandom(1)
191 have_urandom = True
192 try:
193
194 frandom = open('/dev/urandom', 'wb')
195 try:
196 if python_version == 2:
197 frandom.write(''.join(chr(t) for t in ctokens))
198 else:
199 frandom.write(bytes([]).join(bytes([t]) for t in ctokens))
200 finally:
201 frandom.close()
202 except IOError:
203
204 pass
205 except NotImplementedError:
206 have_urandom = False
207 logger.warning(
208 """Cryptographically secure session management is not possible on your system because
209 your system does not provide a cryptographically secure entropy source.
210 This is not specific to web2py; consider deploying on a different operating system.""")
211 if python_version == 2:
212 packed = ''.join(chr(x) for x in ctokens)
213 else:
214 packed = bytes([]).join(bytes([x]) for x in ctokens)
215 unpacked_ctokens = struct.unpack('=QQ', packed)
216 return unpacked_ctokens, have_urandom
217 UNPACKED_CTOKENS, HAVE_URANDOM = initialize_urandom()
218
219
221 """
222 this is 4x faster than calling os.urandom(16) and prevents
223 the "too many files open" issue with concurrent access to os.urandom()
224 """
225 try:
226 return urandom.pop()
227 except IndexError:
228 try:
229 locker.acquire()
230 ur = os.urandom(16 * 1024)
231 urandom += [ur[i:i + 16] for i in xrange(16, 1024 * 16, 16)]
232 return ur[0:16]
233 finally:
234 locker.release()
235
236
238 """
239 This function follows from the following discussion:
240 http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09
241
242 It works like uuid.uuid4 except that tries to use os.urandom() if possible
243 and it XORs the output with the tokens uniquely associated with this machine.
244 """
245 rand_longs = (random.getrandbits(64), random.getrandbits(64))
246 if HAVE_URANDOM:
247 urand_longs = struct.unpack('=QQ', fast_urandom16())
248 byte_s = struct.pack('=QQ',
249 rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
250 rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
251 else:
252 byte_s = struct.pack('=QQ',
253 rand_longs[0] ^ ctokens[0],
254 rand_longs[1] ^ ctokens[1])
255 return str(uuid.UUID(bytes=byte_s, version=4))
256
257 REGEX_IPv4 = re.compile('(\d+)\.(\d+)\.(\d+)\.(\d+)')
258
259
261 """
262 >>> is_valid_ip_address('127.0')
263 False
264 >>> is_valid_ip_address('127.0.0.1')
265 True
266 >>> is_valid_ip_address('2001:660::1')
267 True
268 """
269
270 if address.lower() in ('127.0.0.1', 'localhost', '::1', '::ffff:127.0.0.1'):
271 return True
272 elif address.lower() in ('unknown', ''):
273 return False
274 elif address.count('.') == 3:
275 if address.startswith('::ffff:'):
276 address = address[7:]
277 if hasattr(socket, 'inet_aton'):
278 try:
279 socket.inet_aton(address)
280 return True
281 except socket.error:
282 return False
283 else:
284 match = REGEX_IPv4.match(address)
285 if match and all(0 <= int(match.group(i)) < 256 for i in (1, 2, 3, 4)):
286 return True
287 return False
288 elif hasattr(socket, 'inet_pton'):
289 try:
290 socket.inet_pton(socket.AF_INET6, address)
291 return True
292 except socket.error:
293 return False
294 else:
295 return True
296
297
299 """
300 Determines whether the address appears to be a loopback address.
301 This assumes that the IP is valid.
302 """
303 if addrinfo:
304 if addrinfo[0] == socket.AF_INET or addrinfo[0] == socket.AF_INET6:
305 ip = addrinfo[4]
306 if not isinstance(ip, basestring):
307 return False
308
309 if ip.count('.') == 3:
310 return ip.lower().startswith(('127', '::127', '0:0:0:0:0:0:127',
311 '::ffff:127', '0:0:0:0:0:ffff:127'))
312 return ip == '::1' or ip == '0:0:0:0:0:0:0:1'
313
314
316 """
317 Filter out non-IP and bad IP addresses from getaddrinfo
318 """
319 try:
320 return [addrinfo for addrinfo in socket.getaddrinfo(host, None)
321 if (addrinfo[0] == socket.AF_INET or
322 addrinfo[0] == socket.AF_INET6)
323 and isinstance(addrinfo[4][0], basestring)]
324 except socket.error:
325 return []
326