# -*- coding: utf-8 -*- '''Miscellaneous DAV-related utility functions and their associated unit tests. ''' import mimetypes import time import os.path import re from fnmatch import fnmatchcase class DAVError(Exception): '''A protocol exception that is passed to client through HTTP. Two properties: - httpstatus: e.g. '404 Not Found' - body: None or e.g. '' Argument httpstatus is passed to WebDAV client as HTTP status code. Body can optionally be an XML response body; otherwise, exception handler generates an text/plain response of the status code. ''' def __init__(self, httpstatus, body = None): Exception.__init__(self, httpstatus) self.httpstatus = str(httpstatus) self.body = body and str(body) def __str__(self): return self.httpstatus def __repr__(self): return ('DAVError(' + repr(self.httpstatus) + ', ' + repr(self.body) + ')') def __eq__(self, other): return (isinstance(other, DAVError) and self.httpstatus == other.httpstatus and self.body == other.body) def __hash__(self): return hash(self.httpstatus) ^ hash(self.body) def read_blocks(source, count = None, blocksize = 1024*1024): '''Read and yield block-sized strings from open file object, up to a total of count bytes. ''' while count is None or count > 0: if count is not None: blocksize = min(count, blocksize) data = source.read(blocksize) if len(data) == 0: return # End of file if count is not None: count -= len(data) yield data def write_blocks(dest, blocks): '''Write a series of blocks to open file object.''' for block in blocks: dest.write(block) def path_inside_directory(path, root): '''Check if path is inside root directory. ''' path_parts = os.path.abspath(path).split(os.path.sep) root_parts = os.path.abspath(root).split(os.path.sep) if root_parts == ['', '']: root_parts = [''] # When root is '/', split gives '','' return path_parts[:len(root_parts)] == root_parts def get_relpath(path, root): '''Get the relative path after the root path. This differs from os.path.relpath slightly: - The result never has trailing or leading / or .. - The result is empty string if path == root - Path must be inside root directory. ''' path_parts = os.path.abspath(path).split(os.path.sep) root_parts = os.path.abspath(root).split(os.path.sep) if root_parts == ['', '']: root_parts = [''] # When root is '/', split gives '','' assert path_parts[:len(root_parts)] == root_parts return os.path.sep.join(path_parts[len(root_parts):]) def get_isoformat(timestamp): '''Format the timestamp according to ISO8601 / RFC3339.''' t = time.gmtime(timestamp) return time.strftime('%Y-%m-%dT%H:%M:%SZ', t) def get_rfcformat(timestamp): '''Format the timestamp according to RFC822.''' t = time.gmtime(timestamp) return time.strftime('%a, %d %b %Y %H:%M:%S %z', t) def get_usertime(timestamp): '''Format the timestamp for reading by user.''' t = time.localtime(timestamp) return time.strftime('%d-%b-%Y %H:%M:%S', t) def set_mtime(real_path, rfctime): '''Set file modification time based on a RFC822 timestamp.''' timestamp = time.strptime(rfctime, '%a, %d %b %Y %H:%M:%S %z') os.utime(real_path, (timestamp, timestamp)) def pretty_unit(value, base=1000, minunit=None, format="%0.1f"): ''' Finds the correct unit and returns a pretty string pretty_unit(4190591051, base=1024) = "3.9 Gi" From http://github.com/str4nd/bittivahti/ ''' if not minunit: minunit = base # Units based on base if base == 1000: units = [' ', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'] elif base == 1024: units = [' ', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi'] else: raise ValueError("The unit base has to be 1000 or 1024") # Divide until below threshold or base v = float(value) u = base for unit in units: if v >= base or u <= minunit: v = v/base u = u * base else: return format % v + " " + unit def get_mimetype(real_path): '''Use mimetypes module to guess Content-Type for the file. If it fails, use application/octet-stream. ''' mimetype = mimetypes.guess_type(real_path)[0] if not mimetype: mimetype = 'application/octet-stream' return mimetype def create_etag(real_path): '''Get an unique identifier for this revision of the file. This is used by HTTP clients for caching purposes. ''' return ('"' + str(os.path.getmtime(real_path)) + 'S' + str(os.path.getsize(real_path)) + '"') def compare_etags(etag, etag_list): '''Compare the specified etag against the list. List can be either a single tag, a list separated with comma, or an asterisk: - '"tag"': matches only if etag == '"tag"' - '"tag1", "tag2"': matches if etag in ['"tag1"', '"tag2"'] - '*': matches any etag Note: the ETags generated by this application do not contain commas. This function can't match against ETags with commas. ''' parts = [e.strip() for e in etag_list.split(',')] if parts == ['*']: return True elif etag in parts: return True else: return False def add_to_dict_list(dictionary, key, item): '''Add the item to the list stored in the dictionary with the specified key. If the key does not exist, create a new list. ''' if not dictionary.has_key(key): dictionary[key] = [] dictionary[key].append(item) def search_directory(directory, depth = -1): '''Find all files and directories under a directory tree, yielding paths. Depth is the recursion limit: 0 == yield just the start directory, 1 == yield start directory and files there, -1 == infinite. ''' yield directory if depth == 0 or not os.path.isdir(directory): return for filename in os.listdir(directory): path = os.path.join(directory, filename) if os.path.isdir(filename): for path in search_directory(path, depth - 1): yield path else: yield path def add_to_zip_recursively(zipobj, real_path, root_dir, check_read): '''Adds the file at real_path, and if it is a directory, all files under it to a ZIP archive. Filenames are converted from UTF-8 to CP437. Root_dir is stripped from beginning of each file name. Check_read is a function that returns False for files that should not be included in archive. ''' if not root_dir.endswith('/'): root_dir += '/' for path in search_directory(real_path): if not os.path.isdir(path) or not check_read(path): continue assert path[:len(root_dir)] == root_dir rel_path = path[len(root_dir):] rel_path = rel_path.encode('cp437', 'replace') zipobj.write(path, rel_path) def compare_path(real_path, patterns): '''Compare a path to a list of patterns. Patterns can be either shell glob patterns that are compared against each path component, or functions that get passed the complete path. ''' real_path = os.path.normpath(real_path) parts = real_path.strip('/').split('/') for pattern in patterns: if callable(pattern): if pattern(real_path): return True else: for part in parts: if fnmatchcase(part, pattern): return True return False def parse_if_list(string): '''Read a "List" structure as defined in RFC4918 Returns list of tuples (Type, Invert, Value). To make parsing easier, state tokens or ETags should not contain any of the following characters: ()[]<>" The state tokens and ETags generated by this program satisfy this rule and I don't expect any sane client to pass other values. Resource tags can contain anything except > RFC4918 Section 10.4.2: List = "(" 1*Condition ")" Condition = ["Not"] (State-token | "[" entity-tag "]") entity-tag = [ weak ] opaque-tag weak = "W/" opaque-tag = quoted-string quoted-string = ( <"> *(qdtext | quoted-pair ) <"> ) qdtext = > quoted-pair = "\" CHAR State-token = Coded-URL Coded-URL = "<" absolute-URI ">" ''' results = [] conditions = re.findall(r'(Not)?\s*([<\[][^>\]]+[>\]])', string) for c_not, c_tag in conditions: if c_tag.startswith('['): c_type = 'etag' c_tag = c_tag.strip('[]') else: c_type = 'token' c_tag = c_tag.strip('<>') results.append((c_type, bool(c_not), c_tag)) return results def parse_if_header(if_header): '''Parse a HTTP If: -header. Returns a list of tuples of resource url and conditions. Each condition is a tuple of (Type, Invert, Value), where: Type is 'etag' or 'token', Invert is True or False and Value is a string. Each list of conditions must match completely, and any of the tuples in the top-most list must match. parse_if_header( '( ' + '["I am an ETag"]) (["I am another ETag"])') should give: [(None, [('token', False, 'urn:uuid:181d4fae-7d8c-11d0-a765-00a0c91e6bf2'), ('etag', False, '"I am an ETag"')]), (None, [('etag', False, '"I am another ETag"')]) ] RFC4918: If = "If" ":" ( 1*No-tag-list | 1*Tagged-list ) No-tag-list = List Tagged-list = Resource-Tag 1*List List = "(" 1*Condition ")" Resource-Tag = "<" Simple-ref ">" ''' if_header = if_header.strip() if if_header[0] == '(': no_tag = True lists = re.findall(r'()\(([^\)]+)\)', if_header) else: no_tag = False # Group 1: Resource-Tag, Group 2: List contents lists = re.findall(r'<([^>]+)>\s*\(([^\)]+)\)', if_header) results = [] for l_tag, l_contents in lists: if no_tag: l_tag = None results.append((l_tag, parse_if_list(l_contents))) return results def parse_timeout(value): '''Parses a TimeType construction, returning timeout in seconds or None for infinity. Invalid strings return ValueError. ''' value = value.strip() if value == 'Infinite': return None elif value.startswith('Second-'): return int(value[len('Second-'):]) else: raise ValueError('Unknown timeout type') if __name__ == '__main__': print "Unit tests" assert path_inside_directory('/tmp/foobar', '/tmp') assert path_inside_directory('/', '/') assert path_inside_directory('/foobar', '/') assert path_inside_directory('foobar', '') assert not path_inside_directory('/', '/tmp') assert not path_inside_directory('/tmp/../tmp/..', '/tmp') assert not path_inside_directory('..', '') assert get_relpath('/tmp/foobar', '/tmp') == 'foobar' assert get_relpath('/tmp/', '/tmp') == '' assert get_relpath('/foobar', '/') == 'foobar' test_dict = {} add_to_dict_list(test_dict, 'ankka', 'heppa') add_to_dict_list(test_dict, 'ankka', 'koira') assert test_dict['ankka'] == ['heppa', 'koira'] assert compare_etags('"foo"', '"foo"') assert not compare_etags('"foo"', '"foo2"') assert compare_etags('"foo"', '"foo", "foo2"') assert compare_etags('"foo"', '"foo2","foo"') assert compare_etags('"foo"', '*') assert not compare_etags('"foo"', '') assert compare_path('/tmp/.svn/foo', ['foo']) assert not compare_path('/tmp/.svn/foo2', ['foo']) assert compare_path('/tmp/.svn/foo', ['.svn']) assert compare_path('/tmp/hack.php', ['*.php']) assert compare_path('/tmp/.hack.php', ['*.php']) assert compare_path('/tmp/hack.php.txt', ['*.php.*']) assert compare_path('/tmp/foo', ['*']) assert (parse_if_list('(["Foobar"]Not["foobar"])') == [('etag', False, '"Foobar"'), ('etag', True, '"foobar"')]) assert (parse_if_header( '( ' + '["I am an ETag"]) (["I am another ETag"])') == [(None, [('token', False, 'urn:uuid:181d4fae-7d8c-11d0-a765-00a0c91e6bf2'), ('etag', False, '"I am an ETag"')]), (None, [('etag', False, '"I am another ETag"')]) ]) assert (parse_if_header( r''' (["Etagfoo"])''') == [(r'''http://user:pass@test.com/~-._%20!$&'()*+,;=''', [('etag', False, r'"Etagfoo"')]) ]) assert (parse_if_header('(Not["Etag"])') == [('foo', [('etag', True, '"Etag"')])]) assert parse_timeout('Second-1234') == 1234 assert parse_timeout('Infinite') is None print "Unit tests OK"