initial push
This commit is contained in:
parent
6ada1e1a08
commit
8c45165c19
|
@ -0,0 +1,581 @@
|
|||
"""Pythonic command-line interface parser that will make you smile.
|
||||
|
||||
* http://docopt.org
|
||||
* Repository and issue-tracker: https://github.com/docopt/docopt
|
||||
* Licensed under terms of MIT license (see LICENSE-MIT)
|
||||
* Copyright (c) 2013 Vladimir Keleshev, vladimir@keleshev.com
|
||||
|
||||
"""
|
||||
import sys
|
||||
import re
|
||||
|
||||
|
||||
__all__ = ['docopt']
|
||||
__version__ = '0.6.2'
|
||||
|
||||
|
||||
class DocoptLanguageError(Exception):
|
||||
|
||||
"""Error in construction of usage-message by developer."""
|
||||
|
||||
|
||||
class DocoptExit(SystemExit):
|
||||
|
||||
"""Exit in case user invoked program with incorrect arguments."""
|
||||
|
||||
usage = ''
|
||||
|
||||
def __init__(self, message=''):
|
||||
SystemExit.__init__(self, (message + '\n' + self.usage).strip())
|
||||
|
||||
|
||||
class Pattern(object):
|
||||
|
||||
def __eq__(self, other):
|
||||
return repr(self) == repr(other)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(repr(self))
|
||||
|
||||
def fix(self):
|
||||
self.fix_identities()
|
||||
self.fix_repeating_arguments()
|
||||
return self
|
||||
|
||||
def fix_identities(self, uniq=None):
|
||||
"""Make pattern-tree tips point to same object if they are equal."""
|
||||
if not hasattr(self, 'children'):
|
||||
return self
|
||||
uniq = list(set(self.flat())) if uniq is None else uniq
|
||||
for i, child in enumerate(self.children):
|
||||
if not hasattr(child, 'children'):
|
||||
assert child in uniq
|
||||
self.children[i] = uniq[uniq.index(child)]
|
||||
else:
|
||||
child.fix_identities(uniq)
|
||||
|
||||
def fix_repeating_arguments(self):
|
||||
"""Fix elements that should accumulate/increment values."""
|
||||
either = [list(child.children) for child in transform(self).children]
|
||||
for case in either:
|
||||
for e in [child for child in case if case.count(child) > 1]:
|
||||
if type(e) is Argument or type(e) is Option and e.argcount:
|
||||
if e.value is None:
|
||||
e.value = []
|
||||
elif type(e.value) is not list:
|
||||
e.value = e.value.split()
|
||||
if type(e) is Command or type(e) is Option and e.argcount == 0:
|
||||
e.value = 0
|
||||
return self
|
||||
|
||||
|
||||
def transform(pattern):
|
||||
"""Expand pattern into an (almost) equivalent one, but with single Either.
|
||||
|
||||
Example: ((-a | -b) (-c | -d)) => (-a -c | -a -d | -b -c | -b -d)
|
||||
Quirks: [-a] => (-a), (-a...) => (-a -a)
|
||||
|
||||
"""
|
||||
result = []
|
||||
groups = [[pattern]]
|
||||
while groups:
|
||||
children = groups.pop(0)
|
||||
parents = [Required, Optional, OptionsShortcut, Either, OneOrMore]
|
||||
if any(t in map(type, children) for t in parents):
|
||||
child = [c for c in children if type(c) in parents][0]
|
||||
children.remove(child)
|
||||
if type(child) is Either:
|
||||
for c in child.children:
|
||||
groups.append([c] + children)
|
||||
elif type(child) is OneOrMore:
|
||||
groups.append(child.children * 2 + children)
|
||||
else:
|
||||
groups.append(child.children + children)
|
||||
else:
|
||||
result.append(children)
|
||||
return Either(*[Required(*e) for e in result])
|
||||
|
||||
|
||||
class LeafPattern(Pattern):
|
||||
|
||||
"""Leaf/terminal node of a pattern tree."""
|
||||
|
||||
def __init__(self, name, value=None):
|
||||
self.name, self.value = name, value
|
||||
|
||||
def __repr__(self):
|
||||
return '%s(%r, %r)' % (self.__class__.__name__, self.name, self.value)
|
||||
|
||||
def flat(self, *types):
|
||||
return [self] if not types or type(self) in types else []
|
||||
|
||||
def match(self, left, collected=None):
|
||||
collected = [] if collected is None else collected
|
||||
pos, match = self.single_match(left)
|
||||
if match is None:
|
||||
return False, left, collected
|
||||
left_ = left[:pos] + left[pos + 1:]
|
||||
same_name = [a for a in collected if a.name == self.name]
|
||||
if type(self.value) in (int, list):
|
||||
if type(self.value) is int:
|
||||
increment = 1
|
||||
else:
|
||||
increment = ([match.value] if type(match.value) is str
|
||||
else match.value)
|
||||
if not same_name:
|
||||
match.value = increment
|
||||
return True, left_, collected + [match]
|
||||
same_name[0].value += increment
|
||||
return True, left_, collected
|
||||
return True, left_, collected + [match]
|
||||
|
||||
|
||||
class BranchPattern(Pattern):
|
||||
|
||||
"""Branch/inner node of a pattern tree."""
|
||||
|
||||
def __init__(self, *children):
|
||||
self.children = list(children)
|
||||
|
||||
def __repr__(self):
|
||||
return '%s(%s)' % (self.__class__.__name__,
|
||||
', '.join(repr(a) for a in self.children))
|
||||
|
||||
def flat(self, *types):
|
||||
if type(self) in types:
|
||||
return [self]
|
||||
return sum([child.flat(*types) for child in self.children], [])
|
||||
|
||||
|
||||
class Argument(LeafPattern):
|
||||
|
||||
def single_match(self, left):
|
||||
for n, pattern in enumerate(left):
|
||||
if type(pattern) is Argument:
|
||||
return n, Argument(self.name, pattern.value)
|
||||
return None, None
|
||||
|
||||
@classmethod
|
||||
def parse(class_, source):
|
||||
name = re.findall('(<\S*?>)', source)[0]
|
||||
value = re.findall('\[default: (.*)\]', source, flags=re.I)
|
||||
return class_(name, value[0] if value else None)
|
||||
|
||||
|
||||
class Command(Argument):
|
||||
|
||||
def __init__(self, name, value=False):
|
||||
self.name, self.value = name, value
|
||||
|
||||
def single_match(self, left):
|
||||
for n, pattern in enumerate(left):
|
||||
if type(pattern) is Argument:
|
||||
if pattern.value == self.name:
|
||||
return n, Command(self.name, True)
|
||||
else:
|
||||
break
|
||||
return None, None
|
||||
|
||||
|
||||
class Option(LeafPattern):
|
||||
|
||||
def __init__(self, short=None, long=None, argcount=0, value=False):
|
||||
assert argcount in (0, 1)
|
||||
self.short, self.long, self.argcount = short, long, argcount
|
||||
self.value = None if value is False and argcount else value
|
||||
|
||||
@classmethod
|
||||
def parse(class_, option_description):
|
||||
short, long, argcount, value = None, None, 0, False
|
||||
options, _, description = option_description.strip().partition(' ')
|
||||
options = options.replace(',', ' ').replace('=', ' ')
|
||||
for s in options.split():
|
||||
if s.startswith('--'):
|
||||
long = s
|
||||
elif s.startswith('-'):
|
||||
short = s
|
||||
else:
|
||||
argcount = 1
|
||||
if argcount:
|
||||
matched = re.findall('\[default: (.*)\]', description, flags=re.I)
|
||||
value = matched[0] if matched else None
|
||||
return class_(short, long, argcount, value)
|
||||
|
||||
def single_match(self, left):
|
||||
for n, pattern in enumerate(left):
|
||||
if self.name == pattern.name:
|
||||
return n, pattern
|
||||
return None, None
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self.long or self.short
|
||||
|
||||
def __repr__(self):
|
||||
return 'Option(%r, %r, %r, %r)' % (self.short, self.long,
|
||||
self.argcount, self.value)
|
||||
|
||||
|
||||
class Required(BranchPattern):
|
||||
|
||||
def match(self, left, collected=None):
|
||||
collected = [] if collected is None else collected
|
||||
l = left
|
||||
c = collected
|
||||
for pattern in self.children:
|
||||
matched, l, c = pattern.match(l, c)
|
||||
if not matched:
|
||||
return False, left, collected
|
||||
return True, l, c
|
||||
|
||||
|
||||
class Optional(BranchPattern):
|
||||
|
||||
def match(self, left, collected=None):
|
||||
collected = [] if collected is None else collected
|
||||
for pattern in self.children:
|
||||
m, left, collected = pattern.match(left, collected)
|
||||
return True, left, collected
|
||||
|
||||
|
||||
class OptionsShortcut(Optional):
|
||||
|
||||
"""Marker/placeholder for [options] shortcut."""
|
||||
|
||||
|
||||
class OneOrMore(BranchPattern):
|
||||
|
||||
def match(self, left, collected=None):
|
||||
assert len(self.children) == 1
|
||||
collected = [] if collected is None else collected
|
||||
l = left
|
||||
c = collected
|
||||
l_ = None
|
||||
matched = True
|
||||
times = 0
|
||||
while matched:
|
||||
# could it be that something didn't match but changed l or c?
|
||||
matched, l, c = self.children[0].match(l, c)
|
||||
times += 1 if matched else 0
|
||||
if l_ == l:
|
||||
break
|
||||
l_ = l
|
||||
if times >= 1:
|
||||
return True, l, c
|
||||
return False, left, collected
|
||||
|
||||
|
||||
class Either(BranchPattern):
|
||||
|
||||
def match(self, left, collected=None):
|
||||
collected = [] if collected is None else collected
|
||||
outcomes = []
|
||||
for pattern in self.children:
|
||||
matched, _, _ = outcome = pattern.match(left, collected)
|
||||
if matched:
|
||||
outcomes.append(outcome)
|
||||
if outcomes:
|
||||
return min(outcomes, key=lambda outcome: len(outcome[1]))
|
||||
return False, left, collected
|
||||
|
||||
|
||||
class Tokens(list):
|
||||
|
||||
def __init__(self, source, error=DocoptExit):
|
||||
self += source.split() if hasattr(source, 'split') else source
|
||||
self.error = error
|
||||
|
||||
@staticmethod
|
||||
def from_pattern(source):
|
||||
source = re.sub(r'([\[\]\(\)\|]|\.\.\.)', r' \1 ', source)
|
||||
source = [s for s in re.split('\s+|(\S*<.*?>)', source) if s]
|
||||
return Tokens(source, error=DocoptLanguageError)
|
||||
|
||||
def move(self):
|
||||
return self.pop(0) if len(self) else None
|
||||
|
||||
def current(self):
|
||||
return self[0] if len(self) else None
|
||||
|
||||
|
||||
def parse_long(tokens, options):
|
||||
"""long ::= '--' chars [ ( ' ' | '=' ) chars ] ;"""
|
||||
long, eq, value = tokens.move().partition('=')
|
||||
assert long.startswith('--')
|
||||
value = None if eq == value == '' else value
|
||||
similar = [o for o in options if o.long == long]
|
||||
if tokens.error is DocoptExit and similar == []: # if no exact match
|
||||
similar = [o for o in options if o.long and o.long.startswith(long)]
|
||||
if len(similar) > 1: # might be simply specified ambiguously 2+ times?
|
||||
raise tokens.error('%s is not a unique prefix: %s?' %
|
||||
(long, ', '.join(o.long for o in similar)))
|
||||
elif len(similar) < 1:
|
||||
argcount = 1 if eq == '=' else 0
|
||||
o = Option(None, long, argcount)
|
||||
options.append(o)
|
||||
if tokens.error is DocoptExit:
|
||||
o = Option(None, long, argcount, value if argcount else True)
|
||||
else:
|
||||
o = Option(similar[0].short, similar[0].long,
|
||||
similar[0].argcount, similar[0].value)
|
||||
if o.argcount == 0:
|
||||
if value is not None:
|
||||
raise tokens.error('%s must not have an argument' % o.long)
|
||||
else:
|
||||
if value is None:
|
||||
if tokens.current() in [None, '--']:
|
||||
raise tokens.error('%s requires argument' % o.long)
|
||||
value = tokens.move()
|
||||
if tokens.error is DocoptExit:
|
||||
o.value = value if value is not None else True
|
||||
return [o]
|
||||
|
||||
|
||||
def parse_shorts(tokens, options):
|
||||
"""shorts ::= '-' ( chars )* [ [ ' ' ] chars ] ;"""
|
||||
token = tokens.move()
|
||||
assert token.startswith('-') and not token.startswith('--')
|
||||
left = token.lstrip('-')
|
||||
parsed = []
|
||||
while left != '':
|
||||
short, left = '-' + left[0], left[1:]
|
||||
similar = [o for o in options if o.short == short]
|
||||
if len(similar) > 1:
|
||||
raise tokens.error('%s is specified ambiguously %d times' %
|
||||
(short, len(similar)))
|
||||
elif len(similar) < 1:
|
||||
o = Option(short, None, 0)
|
||||
options.append(o)
|
||||
if tokens.error is DocoptExit:
|
||||
o = Option(short, None, 0, True)
|
||||
else: # why copying is necessary here?
|
||||
o = Option(short, similar[0].long,
|
||||
similar[0].argcount, similar[0].value)
|
||||
value = None
|
||||
if o.argcount != 0:
|
||||
if left == '':
|
||||
if tokens.current() in [None, '--']:
|
||||
raise tokens.error('%s requires argument' % short)
|
||||
value = tokens.move()
|
||||
else:
|
||||
value = left
|
||||
left = ''
|
||||
if tokens.error is DocoptExit:
|
||||
o.value = value if value is not None else True
|
||||
parsed.append(o)
|
||||
return parsed
|
||||
|
||||
|
||||
def parse_pattern(source, options):
|
||||
tokens = Tokens.from_pattern(source)
|
||||
result = parse_expr(tokens, options)
|
||||
if tokens.current() is not None:
|
||||
raise tokens.error('unexpected ending: %r' % ' '.join(tokens))
|
||||
return Required(*result)
|
||||
|
||||
|
||||
def parse_expr(tokens, options):
|
||||
"""expr ::= seq ( '|' seq )* ;"""
|
||||
seq = parse_seq(tokens, options)
|
||||
if tokens.current() != '|':
|
||||
return seq
|
||||
result = [Required(*seq)] if len(seq) > 1 else seq
|
||||
while tokens.current() == '|':
|
||||
tokens.move()
|
||||
seq = parse_seq(tokens, options)
|
||||
result += [Required(*seq)] if len(seq) > 1 else seq
|
||||
return [Either(*result)] if len(result) > 1 else result
|
||||
|
||||
|
||||
def parse_seq(tokens, options):
|
||||
"""seq ::= ( atom [ '...' ] )* ;"""
|
||||
result = []
|
||||
while tokens.current() not in [None, ']', ')', '|']:
|
||||
atom = parse_atom(tokens, options)
|
||||
if tokens.current() == '...':
|
||||
atom = [OneOrMore(*atom)]
|
||||
tokens.move()
|
||||
result += atom
|
||||
return result
|
||||
|
||||
|
||||
def parse_atom(tokens, options):
|
||||
"""atom ::= '(' expr ')' | '[' expr ']' | 'options'
|
||||
| long | shorts | argument | command ;
|
||||
"""
|
||||
token = tokens.current()
|
||||
result = []
|
||||
if token in '([':
|
||||
tokens.move()
|
||||
matching, pattern = {'(': [')', Required], '[': [']', Optional]}[token]
|
||||
result = pattern(*parse_expr(tokens, options))
|
||||
if tokens.move() != matching:
|
||||
raise tokens.error("unmatched '%s'" % token)
|
||||
return [result]
|
||||
elif token == 'options':
|
||||
tokens.move()
|
||||
return [OptionsShortcut()]
|
||||
elif token.startswith('--') and token != '--':
|
||||
return parse_long(tokens, options)
|
||||
elif token.startswith('-') and token not in ('-', '--'):
|
||||
return parse_shorts(tokens, options)
|
||||
elif token.startswith('<') and token.endswith('>') or token.isupper():
|
||||
return [Argument(tokens.move())]
|
||||
else:
|
||||
return [Command(tokens.move())]
|
||||
|
||||
|
||||
def parse_argv(tokens, options, options_first=False):
|
||||
"""Parse command-line argument vector.
|
||||
|
||||
If options_first:
|
||||
argv ::= [ long | shorts ]* [ argument ]* [ '--' [ argument ]* ] ;
|
||||
else:
|
||||
argv ::= [ long | shorts | argument ]* [ '--' [ argument ]* ] ;
|
||||
|
||||
"""
|
||||
parsed = []
|
||||
while tokens.current() is not None:
|
||||
if tokens.current() == '--':
|
||||
return parsed + [Argument(None, v) for v in tokens]
|
||||
elif tokens.current().startswith('--'):
|
||||
parsed += parse_long(tokens, options)
|
||||
elif tokens.current().startswith('-') and tokens.current() != '-':
|
||||
parsed += parse_shorts(tokens, options)
|
||||
elif options_first:
|
||||
return parsed + [Argument(None, v) for v in tokens]
|
||||
else:
|
||||
parsed.append(Argument(None, tokens.move()))
|
||||
return parsed
|
||||
|
||||
|
||||
def parse_defaults(doc):
|
||||
defaults = []
|
||||
for s in parse_section('options:', doc):
|
||||
# FIXME corner case "bla: options: --foo"
|
||||
_, _, s = s.partition(':') # get rid of "options:"
|
||||
split = re.split('\n[ \t]*(-\S+?)', '\n' + s)[1:]
|
||||
split = [s1 + s2 for s1, s2 in zip(split[::2], split[1::2])]
|
||||
options = [Option.parse(s) for s in split if s.startswith('-')]
|
||||
defaults += options
|
||||
return defaults
|
||||
|
||||
|
||||
def parse_section(name, source):
|
||||
pattern = re.compile('^([^\n]*' + name + '[^\n]*\n?(?:[ \t].*?(?:\n|$))*)',
|
||||
re.IGNORECASE | re.MULTILINE)
|
||||
return [s.strip() for s in pattern.findall(source)]
|
||||
|
||||
|
||||
def formal_usage(section):
|
||||
_, _, section = section.partition(':') # drop "usage:"
|
||||
pu = section.split()
|
||||
return '( ' + ' '.join(') | (' if s == pu[0] else s for s in pu[1:]) + ' )'
|
||||
|
||||
|
||||
def extras(help, version, options, doc):
|
||||
if help and any((o.name in ('-h', '--help')) and o.value for o in options):
|
||||
print(doc.strip("\n"))
|
||||
sys.exit()
|
||||
if version and any(o.name == '--version' and o.value for o in options):
|
||||
print(version)
|
||||
sys.exit()
|
||||
|
||||
|
||||
class Dict(dict):
|
||||
def __repr__(self):
|
||||
return '{%s}' % ',\n '.join('%r: %r' % i for i in sorted(self.items()))
|
||||
|
||||
|
||||
def docopt(doc, argv=None, help=True, version=None, options_first=False):
|
||||
"""Parse `argv` based on command-line interface described in `doc`.
|
||||
|
||||
`docopt` creates your command-line interface based on its
|
||||
description that you pass as `doc`. Such description can contain
|
||||
--options, <positional-argument>, commands, which could be
|
||||
[optional], (required), (mutually | exclusive) or repeated...
|
||||
|
||||
Parameters
|
||||
----------
|
||||
doc : str
|
||||
Description of your command-line interface.
|
||||
argv : list of str, optional
|
||||
Argument vector to be parsed. sys.argv[1:] is used if not
|
||||
provided.
|
||||
help : bool (default: True)
|
||||
Set to False to disable automatic help on -h or --help
|
||||
options.
|
||||
version : any object
|
||||
If passed, the object will be printed if --version is in
|
||||
`argv`.
|
||||
options_first : bool (default: False)
|
||||
Set to True to require options precede positional arguments,
|
||||
i.e. to forbid options and positional arguments intermix.
|
||||
|
||||
Returns
|
||||
-------
|
||||
args : dict
|
||||
A dictionary, where keys are names of command-line elements
|
||||
such as e.g. "--verbose" and "<path>", and values are the
|
||||
parsed values of those elements.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> from docopt import docopt
|
||||
>>> doc = '''
|
||||
... Usage:
|
||||
... my_program tcp <host> <port> [--timeout=<seconds>]
|
||||
... my_program serial <port> [--baud=<n>] [--timeout=<seconds>]
|
||||
... my_program (-h | --help | --version)
|
||||
...
|
||||
... Options:
|
||||
... -h, --help Show this screen and exit.
|
||||
... --baud=<n> Baudrate [default: 9600]
|
||||
... '''
|
||||
>>> argv = ['tcp', '127.0.0.1', '80', '--timeout', '30']
|
||||
>>> docopt(doc, argv)
|
||||
{'--baud': '9600',
|
||||
'--help': False,
|
||||
'--timeout': '30',
|
||||
'--version': False,
|
||||
'<host>': '127.0.0.1',
|
||||
'<port>': '80',
|
||||
'serial': False,
|
||||
'tcp': True}
|
||||
|
||||
See also
|
||||
--------
|
||||
* For video introduction see http://docopt.org
|
||||
* Full documentation is available in README.rst as well as online
|
||||
at https://github.com/docopt/docopt#readme
|
||||
|
||||
"""
|
||||
argv = sys.argv[1:] if argv is None else argv
|
||||
|
||||
usage_sections = parse_section('usage:', doc)
|
||||
if len(usage_sections) == 0:
|
||||
raise DocoptLanguageError('"usage:" (case-insensitive) not found.')
|
||||
if len(usage_sections) > 1:
|
||||
raise DocoptLanguageError('More than one "usage:" (case-insensitive).')
|
||||
DocoptExit.usage = usage_sections[0]
|
||||
|
||||
options = parse_defaults(doc)
|
||||
pattern = parse_pattern(formal_usage(DocoptExit.usage), options)
|
||||
# [default] syntax for argument is disabled
|
||||
#for a in pattern.flat(Argument):
|
||||
# same_name = [d for d in arguments if d.name == a.name]
|
||||
# if same_name:
|
||||
# a.value = same_name[0].value
|
||||
argv = parse_argv(Tokens(argv), list(options), options_first)
|
||||
pattern_options = set(pattern.flat(Option))
|
||||
for options_shortcut in pattern.flat(OptionsShortcut):
|
||||
doc_options = parse_defaults(doc)
|
||||
options_shortcut.children = list(set(doc_options) - pattern_options)
|
||||
#if any_options:
|
||||
# options_shortcut.children += [Option(o.short, o.long, o.argcount)
|
||||
# for o in argv if type(o) is Option]
|
||||
extras(help, version, argv, doc)
|
||||
matched, left, collected = pattern.fix().match(argv)
|
||||
if matched and left == []: # better error message if left?
|
||||
return Dict((a.name, a.value) for a in (pattern.flat() + collected))
|
||||
raise DocoptExit()
|
|
@ -0,0 +1,262 @@
|
|||
"""
|
||||
Read Exif metadata from tiff and jpeg files.
|
||||
"""
|
||||
|
||||
from .exif_log import get_logger
|
||||
from .classes import *
|
||||
from .tags import *
|
||||
from .utils import ord_
|
||||
|
||||
__version__ = '2.1.2'
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
def increment_base(data, base):
|
||||
return ord_(data[base + 2]) * 256 + ord_(data[base + 3]) + 2
|
||||
|
||||
|
||||
def process_file(f, stop_tag=DEFAULT_STOP_TAG, details=True, strict=False, debug=False, truncate_tags=True):
|
||||
"""
|
||||
Process an image file (expects an open file object).
|
||||
|
||||
This is the function that has to deal with all the arbitrary nasty bits
|
||||
of the EXIF standard.
|
||||
"""
|
||||
|
||||
# by default do not fake an EXIF beginning
|
||||
fake_exif = 0
|
||||
|
||||
# determine whether it's a JPEG or TIFF
|
||||
data = f.read(12)
|
||||
if data[0:4] in [b'II*\x00', b'MM\x00*']:
|
||||
# it's a TIFF file
|
||||
logger.debug("TIFF format recognized in data[0:4]")
|
||||
f.seek(0)
|
||||
endian = f.read(1)
|
||||
f.read(1)
|
||||
offset = 0
|
||||
elif data[0:2] == b'\xFF\xD8':
|
||||
# it's a JPEG file
|
||||
logger.debug("JPEG format recognized data[0:2]=0x%X%X", ord_(data[0]), ord_(data[1]))
|
||||
base = 2
|
||||
logger.debug("data[2]=0x%X data[3]=0x%X data[6:10]=%s",
|
||||
ord_(data[2]), ord_(data[3]), data[6:10])
|
||||
while ord_(data[2]) == 0xFF and data[6:10] in (b'JFIF', b'JFXX', b'OLYM', b'Phot'):
|
||||
length = ord_(data[4]) * 256 + ord_(data[5])
|
||||
logger.debug(" Length offset is %s", length)
|
||||
f.read(length - 8)
|
||||
# fake an EXIF beginning of file
|
||||
# I don't think this is used. --gd
|
||||
data = b'\xFF\x00' + f.read(10)
|
||||
fake_exif = 1
|
||||
if base > 2:
|
||||
logger.debug(" Added to base")
|
||||
base = base + length + 4 - 2
|
||||
else:
|
||||
logger.debug(" Added to zero")
|
||||
base = length + 4
|
||||
logger.debug(" Set segment base to 0x%X", base)
|
||||
|
||||
# Big ugly patch to deal with APP2 (or other) data coming before APP1
|
||||
f.seek(0)
|
||||
# in theory, this could be insufficient since 64K is the maximum size--gd
|
||||
data = f.read(base + 4000)
|
||||
# base = 2
|
||||
while 1:
|
||||
logger.debug(" Segment base 0x%X", base)
|
||||
if data[base:base + 2] == b'\xFF\xE1':
|
||||
# APP1
|
||||
logger.debug(" APP1 at base 0x%X", base)
|
||||
logger.debug(" Length: 0x%X 0x%X", ord_(data[base + 2]),
|
||||
ord_(data[base + 3]))
|
||||
logger.debug(" Code: %s", data[base + 4:base + 8])
|
||||
if data[base + 4:base + 8] == b"Exif":
|
||||
logger.debug(" Decrement base by 2 to get to pre-segment header (for compatibility with later code)")
|
||||
base -= 2
|
||||
break
|
||||
increment = increment_base(data, base)
|
||||
logger.debug(" Increment base by %s", increment)
|
||||
base += increment
|
||||
elif data[base:base + 2] == b'\xFF\xE0':
|
||||
# APP0
|
||||
logger.debug(" APP0 at base 0x%X", base)
|
||||
logger.debug(" Length: 0x%X 0x%X", ord_(data[base + 2]),
|
||||
ord_(data[base + 3]))
|
||||
logger.debug(" Code: %s", data[base + 4:base + 8])
|
||||
increment = increment_base(data, base)
|
||||
logger.debug(" Increment base by %s", increment)
|
||||
base += increment
|
||||
elif data[base:base + 2] == b'\xFF\xE2':
|
||||
# APP2
|
||||
logger.debug(" APP2 at base 0x%X", base)
|
||||
logger.debug(" Length: 0x%X 0x%X", ord_(data[base + 2]),
|
||||
ord_(data[base + 3]))
|
||||
logger.debug(" Code: %s", data[base + 4:base + 8])
|
||||
increment = increment_base(data, base)
|
||||
logger.debug(" Increment base by %s", increment)
|
||||
base += increment
|
||||
elif data[base:base + 2] == b'\xFF\xEE':
|
||||
# APP14
|
||||
logger.debug(" APP14 Adobe segment at base 0x%X", base)
|
||||
logger.debug(" Length: 0x%X 0x%X", ord_(data[base + 2]),
|
||||
ord_(data[base + 3]))
|
||||
logger.debug(" Code: %s", data[base + 4:base + 8])
|
||||
increment = increment_base(data, base)
|
||||
logger.debug(" Increment base by %s", increment)
|
||||
base += increment
|
||||
logger.debug(" There is useful EXIF-like data here, but we have no parser for it.")
|
||||
elif data[base:base + 2] == b'\xFF\xDB':
|
||||
logger.debug(" JPEG image data at base 0x%X No more segments are expected.",
|
||||
base)
|
||||
break
|
||||
elif data[base:base + 2] == b'\xFF\xD8':
|
||||
# APP12
|
||||
logger.debug(" FFD8 segment at base 0x%X", base)
|
||||
logger.debug(" Got 0x%X 0x%X and %s instead",
|
||||
ord_(data[base]),
|
||||
ord_(data[base + 1]),
|
||||
data[4 + base:10 + base])
|
||||
logger.debug(" Length: 0x%X 0x%X", ord_(data[base + 2]),
|
||||
ord_(data[base + 3]))
|
||||
logger.debug(" Code: %s", data[base + 4:base + 8])
|
||||
increment = increment_base(data, base)
|
||||
logger.debug(" Increment base by %s", increment)
|
||||
base += increment
|
||||
elif data[base:base + 2] == b'\xFF\xEC':
|
||||
# APP12
|
||||
logger.debug(" APP12 XMP (Ducky) or Pictureinfo segment at base 0x%X",
|
||||
base)
|
||||
logger.debug(" Got 0x%X and 0x%X instead", ord_(data[base]),
|
||||
ord_(data[base + 1]))
|
||||
logger.debug(" Length: 0x%X 0x%X",
|
||||
ord_(data[base + 2]),
|
||||
ord_(data[base + 3]))
|
||||
logger.debug("Code: %s", data[base + 4:base + 8])
|
||||
increment = increment_base(data, base)
|
||||
logger.debug(" Increment base by %s", increment)
|
||||
base += increment
|
||||
logger.debug(
|
||||
" There is useful EXIF-like data here (quality, comment, copyright), but we have no parser for it.")
|
||||
else:
|
||||
try:
|
||||
increment = increment_base(data, base)
|
||||
logger.debug(" Got 0x%X and 0x%X instead",
|
||||
ord_(data[base]),
|
||||
ord_(data[base + 1]))
|
||||
except IndexError:
|
||||
logger.debug(" Unexpected/unhandled segment type or file content.")
|
||||
return {}
|
||||
else:
|
||||
logger.debug(" Increment base by %s", increment)
|
||||
base += increment
|
||||
f.seek(base + 12)
|
||||
if ord_(data[2 + base]) == 0xFF and data[6 + base:10 + base] == b'Exif':
|
||||
# detected EXIF header
|
||||
offset = f.tell()
|
||||
endian = f.read(1)
|
||||
#HACK TEST: endian = 'M'
|
||||
elif ord_(data[2 + base]) == 0xFF and data[6 + base:10 + base + 1] == b'Ducky':
|
||||
# detected Ducky header.
|
||||
logger.debug("EXIF-like header (normally 0xFF and code): 0x%X and %s",
|
||||
ord_(data[2 + base]), data[6 + base:10 + base + 1])
|
||||
offset = f.tell()
|
||||
endian = f.read(1)
|
||||
elif ord_(data[2 + base]) == 0xFF and data[6 + base:10 + base + 1] == b'Adobe':
|
||||
# detected APP14 (Adobe)
|
||||
logger.debug("EXIF-like header (normally 0xFF and code): 0x%X and %s",
|
||||
ord_(data[2 + base]), data[6 + base:10 + base + 1])
|
||||
offset = f.tell()
|
||||
endian = f.read(1)
|
||||
else:
|
||||
# no EXIF information
|
||||
logger.debug("No EXIF header expected data[2+base]==0xFF and data[6+base:10+base]===Exif (or Duck)")
|
||||
logger.debug("Did get 0x%X and %s",
|
||||
ord_(data[2 + base]), data[6 + base:10 + base + 1])
|
||||
return {}
|
||||
else:
|
||||
# file format not recognized
|
||||
logger.debug("File format not recognized.")
|
||||
return {}
|
||||
|
||||
endian = chr(ord_(endian[0]))
|
||||
# deal with the EXIF info we found
|
||||
logger.debug("Endian format is %s (%s)", endian, {
|
||||
'I': 'Intel',
|
||||
'M': 'Motorola',
|
||||
'\x01': 'Adobe Ducky',
|
||||
'd': 'XMP/Adobe unknown'
|
||||
}[endian])
|
||||
|
||||
hdr = ExifHeader(f, endian, offset, fake_exif, strict, debug, details, truncate_tags)
|
||||
ifd_list = hdr.list_ifd()
|
||||
thumb_ifd = False
|
||||
ctr = 0
|
||||
for ifd in ifd_list:
|
||||
if ctr == 0:
|
||||
ifd_name = 'Image'
|
||||
elif ctr == 1:
|
||||
ifd_name = 'Thumbnail'
|
||||
thumb_ifd = ifd
|
||||
else:
|
||||
ifd_name = 'IFD %d' % ctr
|
||||
logger.debug('IFD %d (%s) at offset %s:', ctr, ifd_name, ifd)
|
||||
hdr.dump_ifd(ifd, ifd_name, stop_tag=stop_tag)
|
||||
ctr += 1
|
||||
# EXIF IFD
|
||||
exif_off = hdr.tags.get('Image ExifOffset')
|
||||
if exif_off:
|
||||
logger.debug('Exif SubIFD at offset %s:', exif_off.values[0])
|
||||
hdr.dump_ifd(exif_off.values[0], 'EXIF', stop_tag=stop_tag)
|
||||
|
||||
# deal with MakerNote contained in EXIF IFD
|
||||
# (Some apps use MakerNote tags but do not use a format for which we
|
||||
# have a description, do not process these).
|
||||
if details and 'EXIF MakerNote' in hdr.tags and 'Image Make' in hdr.tags:
|
||||
hdr.decode_maker_note()
|
||||
|
||||
# extract thumbnails
|
||||
if details and thumb_ifd:
|
||||
hdr.extract_tiff_thumbnail(thumb_ifd)
|
||||
hdr.extract_jpeg_thumbnail()
|
||||
|
||||
# parse XMP tags (experimental)
|
||||
if debug and details:
|
||||
xmp_string = b''
|
||||
# Easy we already have them
|
||||
if 'Image ApplicationNotes' in hdr.tags:
|
||||
logger.debug('XMP present in Exif')
|
||||
xmp_string = make_string(hdr.tags['Image ApplicationNotes'].values)
|
||||
# We need to look in the entire file for the XML
|
||||
else:
|
||||
logger.debug('XMP not in Exif, searching file for XMP info...')
|
||||
xml_started = False
|
||||
xml_finished = False
|
||||
for line in f:
|
||||
open_tag = line.find(b'<x:xmpmeta')
|
||||
close_tag = line.find(b'</x:xmpmeta>')
|
||||
|
||||
if open_tag != -1:
|
||||
xml_started = True
|
||||
line = line[open_tag:]
|
||||
logger.debug('XMP found opening tag at line position %s' % open_tag)
|
||||
|
||||
if close_tag != -1:
|
||||
logger.debug('XMP found closing tag at line position %s' % close_tag)
|
||||
line_offset = 0
|
||||
if open_tag != -1:
|
||||
line_offset = open_tag
|
||||
line = line[:(close_tag - line_offset) + 12]
|
||||
xml_finished = True
|
||||
|
||||
if xml_started:
|
||||
xmp_string += line
|
||||
|
||||
if xml_finished:
|
||||
break
|
||||
|
||||
logger.debug('XMP Finished searching for info')
|
||||
if xmp_string:
|
||||
hdr.parse_xmp(xmp_string)
|
||||
|
||||
return hdr.tags
|
|
@ -0,0 +1,560 @@
|
|||
import struct
|
||||
import re
|
||||
|
||||
from .exif_log import get_logger
|
||||
from .utils import s2n_motorola, s2n_intel, Ratio
|
||||
from .tags import *
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
try:
|
||||
basestring
|
||||
except NameError:
|
||||
basestring = str
|
||||
|
||||
class IfdTag:
|
||||
"""
|
||||
Eases dealing with tags.
|
||||
"""
|
||||
|
||||
def __init__(self, printable, tag, field_type, values, field_offset,
|
||||
field_length):
|
||||
# printable version of data
|
||||
self.printable = printable
|
||||
# tag ID number
|
||||
self.tag = tag
|
||||
# field type as index into FIELD_TYPES
|
||||
self.field_type = field_type
|
||||
# offset of start of field in bytes from beginning of IFD
|
||||
self.field_offset = field_offset
|
||||
# length of data field in bytes
|
||||
self.field_length = field_length
|
||||
# either a string or array of data items
|
||||
self.values = values
|
||||
|
||||
def __str__(self):
|
||||
return self.printable
|
||||
|
||||
def __repr__(self):
|
||||
try:
|
||||
s = '(0x%04X) %s=%s @ %d' % (self.tag,
|
||||
FIELD_TYPES[self.field_type][2],
|
||||
self.printable,
|
||||
self.field_offset)
|
||||
except:
|
||||
s = '(%s) %s=%s @ %s' % (str(self.tag),
|
||||
FIELD_TYPES[self.field_type][2],
|
||||
self.printable,
|
||||
str(self.field_offset))
|
||||
return s
|
||||
|
||||
|
||||
class ExifHeader:
|
||||
"""
|
||||
Handle an EXIF header.
|
||||
"""
|
||||
def __init__(self, file, endian, offset, fake_exif, strict,
|
||||
debug=False, detailed=True, truncate_tags=True):
|
||||
self.file = file
|
||||
self.endian = endian
|
||||
self.offset = offset
|
||||
self.fake_exif = fake_exif
|
||||
self.strict = strict
|
||||
self.debug = debug
|
||||
self.detailed = detailed
|
||||
self.truncate_tags = truncate_tags
|
||||
self.tags = {}
|
||||
|
||||
def s2n(self, offset, length, signed=0):
|
||||
"""
|
||||
Convert slice to integer, based on sign and endian flags.
|
||||
|
||||
Usually this offset is assumed to be relative to the beginning of the
|
||||
start of the EXIF information.
|
||||
For some cameras that use relative tags, this offset may be relative
|
||||
to some other starting point.
|
||||
"""
|
||||
self.file.seek(self.offset + offset)
|
||||
sliced = self.file.read(length)
|
||||
if self.endian == 'I':
|
||||
val = s2n_intel(sliced)
|
||||
else:
|
||||
val = s2n_motorola(sliced)
|
||||
# Sign extension?
|
||||
if signed:
|
||||
msb = 1 << (8 * length - 1)
|
||||
if val & msb:
|
||||
val -= (msb << 1)
|
||||
return val
|
||||
|
||||
def n2s(self, offset, length):
|
||||
"""Convert offset to string."""
|
||||
s = ''
|
||||
for dummy in range(length):
|
||||
if self.endian == 'I':
|
||||
s += chr(offset & 0xFF)
|
||||
else:
|
||||
s = chr(offset & 0xFF) + s
|
||||
offset = offset >> 8
|
||||
return s
|
||||
|
||||
def _first_ifd(self):
|
||||
"""Return first IFD."""
|
||||
return self.s2n(4, 4)
|
||||
|
||||
def _next_ifd(self, ifd):
|
||||
"""Return the pointer to next IFD."""
|
||||
entries = self.s2n(ifd, 2)
|
||||
next_ifd = self.s2n(ifd + 2 + 12 * entries, 4)
|
||||
if next_ifd == ifd:
|
||||
return 0
|
||||
else:
|
||||
return next_ifd
|
||||
|
||||
def list_ifd(self):
|
||||
"""Return the list of IFDs in the header."""
|
||||
i = self._first_ifd()
|
||||
ifds = []
|
||||
while i:
|
||||
ifds.append(i)
|
||||
i = self._next_ifd(i)
|
||||
return ifds
|
||||
|
||||
def dump_ifd(self, ifd, ifd_name, tag_dict=EXIF_TAGS, relative=0, stop_tag=DEFAULT_STOP_TAG):
|
||||
"""
|
||||
Return a list of entries in the given IFD.
|
||||
"""
|
||||
# make sure we can process the entries
|
||||
try:
|
||||
entries = self.s2n(ifd, 2)
|
||||
except TypeError:
|
||||
logger.warning("Possibly corrupted IFD: %s" % ifd)
|
||||
return
|
||||
|
||||
for i in range(entries):
|
||||
# entry is index of start of this IFD in the file
|
||||
entry = ifd + 2 + 12 * i
|
||||
tag = self.s2n(entry, 2)
|
||||
|
||||
# get tag name early to avoid errors, help debug
|
||||
tag_entry = tag_dict.get(tag)
|
||||
if tag_entry:
|
||||
tag_name = tag_entry[0]
|
||||
else:
|
||||
tag_name = 'Tag 0x%04X' % tag
|
||||
|
||||
# ignore certain tags for faster processing
|
||||
if not (not self.detailed and tag in IGNORE_TAGS):
|
||||
field_type = self.s2n(entry + 2, 2)
|
||||
|
||||
# unknown field type
|
||||
if not 0 < field_type < len(FIELD_TYPES):
|
||||
if not self.strict:
|
||||
continue
|
||||
else:
|
||||
raise ValueError('Unknown type %d in tag 0x%04X' % (field_type, tag))
|
||||
|
||||
type_length = FIELD_TYPES[field_type][0]
|
||||
count = self.s2n(entry + 4, 4)
|
||||
# Adjust for tag id/type/count (2+2+4 bytes)
|
||||
# Now we point at either the data or the 2nd level offset
|
||||
offset = entry + 8
|
||||
|
||||
# If the value fits in 4 bytes, it is inlined, else we
|
||||
# need to jump ahead again.
|
||||
if count * type_length > 4:
|
||||
# offset is not the value; it's a pointer to the value
|
||||
# if relative we set things up so s2n will seek to the right
|
||||
# place when it adds self.offset. Note that this 'relative'
|
||||
# is for the Nikon type 3 makernote. Other cameras may use
|
||||
# other relative offsets, which would have to be computed here
|
||||
# slightly differently.
|
||||
if relative:
|
||||
tmp_offset = self.s2n(offset, 4)
|
||||
offset = tmp_offset + ifd - 8
|
||||
if self.fake_exif:
|
||||
offset += 18
|
||||
else:
|
||||
offset = self.s2n(offset, 4)
|
||||
|
||||
field_offset = offset
|
||||
values = None
|
||||
if field_type == 2:
|
||||
# special case: null-terminated ASCII string
|
||||
# XXX investigate
|
||||
# sometimes gets too big to fit in int value
|
||||
if count != 0: # and count < (2**31): # 2E31 is hardware dependant. --gd
|
||||
file_position = self.offset + offset
|
||||
try:
|
||||
self.file.seek(file_position)
|
||||
values = self.file.read(count)
|
||||
|
||||
# Drop any garbage after a null.
|
||||
values = values.split(b'\x00', 1)[0]
|
||||
if isinstance(values, bytes):
|
||||
try:
|
||||
values = values.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
logger.warning("Possibly corrupted field %s in %s IFD", tag_name, ifd_name)
|
||||
except OverflowError:
|
||||
logger.warn('OverflowError at position: %s, length: %s', file_position, count)
|
||||
values = ''
|
||||
except MemoryError:
|
||||
logger.warn('MemoryError at position: %s, length: %s', file_position, count)
|
||||
values = ''
|
||||
else:
|
||||
values = ''
|
||||
else:
|
||||
values = []
|
||||
signed = (field_type in [6, 8, 9, 10])
|
||||
|
||||
# XXX investigate
|
||||
# some entries get too big to handle could be malformed
|
||||
# file or problem with self.s2n
|
||||
if count < 1000:
|
||||
for dummy in range(count):
|
||||
if field_type in (5, 10):
|
||||
# a ratio
|
||||
value = Ratio(self.s2n(offset, 4, signed),
|
||||
self.s2n(offset + 4, 4, signed))
|
||||
else:
|
||||
value = self.s2n(offset, type_length, signed)
|
||||
values.append(value)
|
||||
offset = offset + type_length
|
||||
# The test above causes problems with tags that are
|
||||
# supposed to have long values! Fix up one important case.
|
||||
elif tag_name in ('MakerNote', makernote.canon.CAMERA_INFO_TAG_NAME):
|
||||
for dummy in range(count):
|
||||
value = self.s2n(offset, type_length, signed)
|
||||
values.append(value)
|
||||
offset = offset + type_length
|
||||
|
||||
# now 'values' is either a string or an array
|
||||
if count == 1 and field_type != 2:
|
||||
printable = str(values[0])
|
||||
elif count > 50 and len(values) > 20 and not isinstance(values, basestring) :
|
||||
if self.truncate_tags :
|
||||
printable = str(values[0:20])[0:-1] + ", ... ]"
|
||||
else:
|
||||
printable = str(values[0:-1])
|
||||
else:
|
||||
try:
|
||||
printable = str(values)
|
||||
except UnicodeEncodeError:
|
||||
printable = unicode(values)
|
||||
# compute printable version of values
|
||||
if tag_entry:
|
||||
# optional 2nd tag element is present
|
||||
if len(tag_entry) != 1:
|
||||
if callable(tag_entry[1]):
|
||||
# call mapping function
|
||||
printable = tag_entry[1](values)
|
||||
elif type(tag_entry[1]) is tuple:
|
||||
ifd_info = tag_entry[1]
|
||||
try:
|
||||
logger.debug('%s SubIFD at offset %d:', ifd_info[0], values[0])
|
||||
self.dump_ifd(values[0], ifd_info[0], tag_dict=ifd_info[1], stop_tag=stop_tag)
|
||||
except IndexError:
|
||||
logger.warn('No values found for %s SubIFD', ifd_info[0])
|
||||
else:
|
||||
printable = ''
|
||||
for i in values:
|
||||
# use lookup table for this tag
|
||||
printable += tag_entry[1].get(i, repr(i))
|
||||
|
||||
self.tags[ifd_name + ' ' + tag_name] = IfdTag(printable, tag,
|
||||
field_type,
|
||||
values, field_offset,
|
||||
count * type_length)
|
||||
try:
|
||||
tag_value = repr(self.tags[ifd_name + ' ' + tag_name])
|
||||
# fix for python2's handling of unicode values
|
||||
except UnicodeEncodeError:
|
||||
tag_value = unicode(self.tags[ifd_name + ' ' + tag_name])
|
||||
logger.debug(' %s: %s', tag_name, tag_value)
|
||||
|
||||
if tag_name == stop_tag:
|
||||
break
|
||||
|
||||
def extract_tiff_thumbnail(self, thumb_ifd):
|
||||
"""
|
||||
Extract uncompressed TIFF thumbnail.
|
||||
|
||||
Take advantage of the pre-existing layout in the thumbnail IFD as
|
||||
much as possible
|
||||
"""
|
||||
thumb = self.tags.get('Thumbnail Compression')
|
||||
if not thumb or thumb.printable != 'Uncompressed TIFF':
|
||||
return
|
||||
|
||||
entries = self.s2n(thumb_ifd, 2)
|
||||
# this is header plus offset to IFD ...
|
||||
if self.endian == 'M':
|
||||
tiff = 'MM\x00*\x00\x00\x00\x08'
|
||||
else:
|
||||
tiff = 'II*\x00\x08\x00\x00\x00'
|
||||
# ... plus thumbnail IFD data plus a null "next IFD" pointer
|
||||
self.file.seek(self.offset + thumb_ifd)
|
||||
tiff += self.file.read(entries * 12 + 2) + '\x00\x00\x00\x00'
|
||||
|
||||
# fix up large value offset pointers into data area
|
||||
for i in range(entries):
|
||||
entry = thumb_ifd + 2 + 12 * i
|
||||
tag = self.s2n(entry, 2)
|
||||
field_type = self.s2n(entry + 2, 2)
|
||||
type_length = FIELD_TYPES[field_type][0]
|
||||
count = self.s2n(entry + 4, 4)
|
||||
old_offset = self.s2n(entry + 8, 4)
|
||||
# start of the 4-byte pointer area in entry
|
||||
ptr = i * 12 + 18
|
||||
# remember strip offsets location
|
||||
if tag == 0x0111:
|
||||
strip_off = ptr
|
||||
strip_len = count * type_length
|
||||
# is it in the data area?
|
||||
if count * type_length > 4:
|
||||
# update offset pointer (nasty "strings are immutable" crap)
|
||||
# should be able to say "tiff[ptr:ptr+4]=newoff"
|
||||
newoff = len(tiff)
|
||||
tiff = tiff[:ptr] + self.n2s(newoff, 4) + tiff[ptr + 4:]
|
||||
# remember strip offsets location
|
||||
if tag == 0x0111:
|
||||
strip_off = newoff
|
||||
strip_len = 4
|
||||
# get original data and store it
|
||||
self.file.seek(self.offset + old_offset)
|
||||
tiff += self.file.read(count * type_length)
|
||||
|
||||
# add pixel strips and update strip offset info
|
||||
old_offsets = self.tags['Thumbnail StripOffsets'].values
|
||||
old_counts = self.tags['Thumbnail StripByteCounts'].values
|
||||
for i in range(len(old_offsets)):
|
||||
# update offset pointer (more nasty "strings are immutable" crap)
|
||||
offset = self.n2s(len(tiff), strip_len)
|
||||
tiff = tiff[:strip_off] + offset + tiff[strip_off + strip_len:]
|
||||
strip_off += strip_len
|
||||
# add pixel strip to end
|
||||
self.file.seek(self.offset + old_offsets[i])
|
||||
tiff += self.file.read(old_counts[i])
|
||||
|
||||
self.tags['TIFFThumbnail'] = tiff
|
||||
|
||||
def extract_jpeg_thumbnail(self):
|
||||
"""
|
||||
Extract JPEG thumbnail.
|
||||
|
||||
(Thankfully the JPEG data is stored as a unit.)
|
||||
"""
|
||||
thumb_offset = self.tags.get('Thumbnail JPEGInterchangeFormat')
|
||||
if thumb_offset:
|
||||
self.file.seek(self.offset + thumb_offset.values[0])
|
||||
size = self.tags['Thumbnail JPEGInterchangeFormatLength'].values[0]
|
||||
self.tags['JPEGThumbnail'] = self.file.read(size)
|
||||
|
||||
# Sometimes in a TIFF file, a JPEG thumbnail is hidden in the MakerNote
|
||||
# since it's not allowed in a uncompressed TIFF IFD
|
||||
if 'JPEGThumbnail' not in self.tags:
|
||||
thumb_offset = self.tags.get('MakerNote JPEGThumbnail')
|
||||
if thumb_offset:
|
||||
self.file.seek(self.offset + thumb_offset.values[0])
|
||||
self.tags['JPEGThumbnail'] = self.file.read(thumb_offset.field_length)
|
||||
|
||||
def decode_maker_note(self):
|
||||
"""
|
||||
Decode all the camera-specific MakerNote formats
|
||||
|
||||
Note is the data that comprises this MakerNote.
|
||||
The MakerNote will likely have pointers in it that point to other
|
||||
parts of the file. We'll use self.offset as the starting point for
|
||||
most of those pointers, since they are relative to the beginning
|
||||
of the file.
|
||||
If the MakerNote is in a newer format, it may use relative addressing
|
||||
within the MakerNote. In that case we'll use relative addresses for
|
||||
the pointers.
|
||||
As an aside: it's not just to be annoying that the manufacturers use
|
||||
relative offsets. It's so that if the makernote has to be moved by the
|
||||
picture software all of the offsets don't have to be adjusted. Overall,
|
||||
this is probably the right strategy for makernotes, though the spec is
|
||||
ambiguous.
|
||||
The spec does not appear to imagine that makernotes would
|
||||
follow EXIF format internally. Once they did, it's ambiguous whether
|
||||
the offsets should be from the header at the start of all the EXIF info,
|
||||
or from the header at the start of the makernote.
|
||||
"""
|
||||
note = self.tags['EXIF MakerNote']
|
||||
|
||||
# Some apps use MakerNote tags but do not use a format for which we
|
||||
# have a description, so just do a raw dump for these.
|
||||
make = self.tags['Image Make'].printable
|
||||
|
||||
# Nikon
|
||||
# The maker note usually starts with the word Nikon, followed by the
|
||||
# type of the makernote (1 or 2, as a short). If the word Nikon is
|
||||
# not at the start of the makernote, it's probably type 2, since some
|
||||
# cameras work that way.
|
||||
if 'NIKON' in make:
|
||||
if note.values[0:7] == [78, 105, 107, 111, 110, 0, 1]:
|
||||
logger.debug("Looks like a type 1 Nikon MakerNote.")
|
||||
self.dump_ifd(note.field_offset + 8, 'MakerNote',
|
||||
tag_dict=makernote.nikon.TAGS_OLD)
|
||||
elif note.values[0:7] == [78, 105, 107, 111, 110, 0, 2]:
|
||||
logger.debug("Looks like a labeled type 2 Nikon MakerNote")
|
||||
if note.values[12:14] != [0, 42] and note.values[12:14] != [42, 0]:
|
||||
raise ValueError("Missing marker tag '42' in MakerNote.")
|
||||
# skip the Makernote label and the TIFF header
|
||||
self.dump_ifd(note.field_offset + 10 + 8, 'MakerNote',
|
||||
tag_dict=makernote.nikon.TAGS_NEW, relative=1)
|
||||
else:
|
||||
# E99x or D1
|
||||
logger.debug("Looks like an unlabeled type 2 Nikon MakerNote")
|
||||
self.dump_ifd(note.field_offset, 'MakerNote',
|
||||
tag_dict=makernote.nikon.TAGS_NEW)
|
||||
return
|
||||
|
||||
# Olympus
|
||||
if make.startswith('OLYMPUS'):
|
||||
self.dump_ifd(note.field_offset + 8, 'MakerNote',
|
||||
tag_dict=makernote.olympus.TAGS)
|
||||
# TODO
|
||||
#for i in (('MakerNote Tag 0x2020', makernote.OLYMPUS_TAG_0x2020),):
|
||||
# self.decode_olympus_tag(self.tags[i[0]].values, i[1])
|
||||
#return
|
||||
|
||||
# Casio
|
||||
if 'CASIO' in make or 'Casio' in make:
|
||||
self.dump_ifd(note.field_offset, 'MakerNote',
|
||||
tag_dict=makernote.casio.TAGS)
|
||||
return
|
||||
|
||||
# Fujifilm
|
||||
if make == 'FUJIFILM':
|
||||
# bug: everything else is "Motorola" endian, but the MakerNote
|
||||
# is "Intel" endian
|
||||
endian = self.endian
|
||||
self.endian = 'I'
|
||||
# bug: IFD offsets are from beginning of MakerNote, not
|
||||
# beginning of file header
|
||||
offset = self.offset
|
||||
self.offset += note.field_offset
|
||||
# process note with bogus values (note is actually at offset 12)
|
||||
self.dump_ifd(12, 'MakerNote', tag_dict=makernote.fujifilm.TAGS)
|
||||
# reset to correct values
|
||||
self.endian = endian
|
||||
self.offset = offset
|
||||
return
|
||||
|
||||
# Apple
|
||||
if make == 'Apple' and \
|
||||
note.values[0:10] == [65, 112, 112, 108, 101, 32, 105, 79, 83, 0]:
|
||||
t = self.offset
|
||||
self.offset += note.field_offset+14
|
||||
self.dump_ifd(0, 'MakerNote',
|
||||
tag_dict=makernote.apple.TAGS)
|
||||
self.offset = t
|
||||
return
|
||||
|
||||
# Canon
|
||||
if make == 'Canon':
|
||||
self.dump_ifd(note.field_offset, 'MakerNote',
|
||||
tag_dict=makernote.canon.TAGS)
|
||||
|
||||
for i in (('MakerNote Tag 0x0001', makernote.canon.CAMERA_SETTINGS),
|
||||
('MakerNote Tag 0x0002', makernote.canon.FOCAL_LENGTH),
|
||||
('MakerNote Tag 0x0004', makernote.canon.SHOT_INFO),
|
||||
('MakerNote Tag 0x0026', makernote.canon.AF_INFO_2),
|
||||
('MakerNote Tag 0x0093', makernote.canon.FILE_INFO)):
|
||||
if i[0] in self.tags:
|
||||
logger.debug('Canon ' + i[0])
|
||||
self._canon_decode_tag(self.tags[i[0]].values, i[1])
|
||||
del self.tags[i[0]]
|
||||
if makernote.canon.CAMERA_INFO_TAG_NAME in self.tags:
|
||||
tag = self.tags[makernote.canon.CAMERA_INFO_TAG_NAME]
|
||||
logger.debug('Canon CameraInfo')
|
||||
self._canon_decode_camera_info(tag)
|
||||
del self.tags[makernote.canon.CAMERA_INFO_TAG_NAME]
|
||||
return
|
||||
|
||||
def _olympus_decode_tag(self, value, mn_tags):
|
||||
""" TODO Decode Olympus MakerNote tag based on offset within tag."""
|
||||
pass
|
||||
|
||||
def _canon_decode_tag(self, value, mn_tags):
|
||||
"""
|
||||
Decode Canon MakerNote tag based on offset within tag.
|
||||
|
||||
See http://www.burren.cx/david/canon.html by David Burren
|
||||
"""
|
||||
for i in range(1, len(value)):
|
||||
tag = mn_tags.get(i, ('Unknown', ))
|
||||
name = tag[0]
|
||||
if len(tag) > 1:
|
||||
val = tag[1].get(value[i], 'Unknown')
|
||||
else:
|
||||
val = value[i]
|
||||
try:
|
||||
logger.debug(" %s %s %s", i, name, hex(value[i]))
|
||||
except TypeError:
|
||||
logger.debug(" %s %s %s", i, name, value[i])
|
||||
|
||||
# it's not a real IFD Tag but we fake one to make everybody
|
||||
# happy. this will have a "proprietary" type
|
||||
self.tags['MakerNote ' + name] = IfdTag(str(val), None, 0, None,
|
||||
None, None)
|
||||
|
||||
def _canon_decode_camera_info(self, camera_info_tag):
|
||||
"""
|
||||
Decode the variable length encoded camera info section.
|
||||
"""
|
||||
model = self.tags.get('Image Model', None)
|
||||
if not model:
|
||||
return
|
||||
model = str(model.values)
|
||||
|
||||
camera_info_tags = None
|
||||
for (model_name_re, tag_desc) in makernote.canon.CAMERA_INFO_MODEL_MAP.items():
|
||||
if re.search(model_name_re, model):
|
||||
camera_info_tags = tag_desc
|
||||
break
|
||||
else:
|
||||
return
|
||||
|
||||
# We are assuming here that these are all unsigned bytes (Byte or
|
||||
# Unknown)
|
||||
if camera_info_tag.field_type not in (1, 7):
|
||||
return
|
||||
camera_info = struct.pack('<%dB' % len(camera_info_tag.values),
|
||||
*camera_info_tag.values)
|
||||
|
||||
# Look for each data value and decode it appropriately.
|
||||
for offset, tag in camera_info_tags.items():
|
||||
tag_format = tag[1]
|
||||
tag_size = struct.calcsize(tag_format)
|
||||
if len(camera_info) < offset + tag_size:
|
||||
continue
|
||||
packed_tag_value = camera_info[offset:offset + tag_size]
|
||||
tag_value = struct.unpack(tag_format, packed_tag_value)[0]
|
||||
|
||||
tag_name = tag[0]
|
||||
if len(tag) > 2:
|
||||
if callable(tag[2]):
|
||||
tag_value = tag[2](tag_value)
|
||||
else:
|
||||
tag_value = tag[2].get(tag_value, tag_value)
|
||||
logger.debug(" %s %s", tag_name, tag_value)
|
||||
|
||||
self.tags['MakerNote ' + tag_name] = IfdTag(str(tag_value), None,
|
||||
0, None, None, None)
|
||||
|
||||
def parse_xmp(self, xmp_string):
|
||||
import xml.dom.minidom
|
||||
|
||||
logger.debug('XMP cleaning data')
|
||||
|
||||
xml = xml.dom.minidom.parseString(xmp_string)
|
||||
pretty = xml.toprettyxml()
|
||||
cleaned = []
|
||||
for line in pretty.splitlines():
|
||||
if line.strip():
|
||||
cleaned.append(line)
|
||||
self.tags['Image ApplicationNotes'] = IfdTag('\n'.join(cleaned), None,
|
||||
1, None, None, None)
|
|
@ -0,0 +1,76 @@
|
|||
"""
|
||||
Custom log output
|
||||
"""
|
||||
|
||||
import sys
|
||||
import logging
|
||||
|
||||
TEXT_NORMAL = 0
|
||||
TEXT_BOLD = 1
|
||||
TEXT_RED = 31
|
||||
TEXT_GREEN = 32
|
||||
TEXT_YELLOW = 33
|
||||
TEXT_BLUE = 34
|
||||
TEXT_MAGENTA = 35
|
||||
TEXT_CYAN = 36
|
||||
|
||||
|
||||
def get_logger():
|
||||
return logging.getLogger('exifread')
|
||||
|
||||
|
||||
def setup_logger(debug, color):
|
||||
"""Configure the logger."""
|
||||
if debug:
|
||||
log_level = logging.DEBUG
|
||||
else:
|
||||
log_level = logging.INFO
|
||||
|
||||
logger = logging.getLogger('exifread')
|
||||
stream = Handler(log_level, debug, color)
|
||||
logger.addHandler(stream)
|
||||
logger.setLevel(log_level)
|
||||
|
||||
|
||||
class Formatter(logging.Formatter):
|
||||
|
||||
def __init__(self, debug=False, color=False):
|
||||
self.color = color
|
||||
self.debug = debug
|
||||
if self.debug:
|
||||
log_format = '%(levelname)-6s %(message)s'
|
||||
else:
|
||||
log_format = '%(message)s'
|
||||
logging.Formatter.__init__(self, log_format)
|
||||
|
||||
def format(self, record):
|
||||
if self.debug and self.color:
|
||||
if record.levelno >= logging.CRITICAL:
|
||||
color = TEXT_RED
|
||||
elif record.levelno >= logging.ERROR:
|
||||
color = TEXT_RED
|
||||
elif record.levelno >= logging.WARNING:
|
||||
color = TEXT_YELLOW
|
||||
elif record.levelno >= logging.INFO:
|
||||
color = TEXT_GREEN
|
||||
elif record.levelno >= logging.DEBUG:
|
||||
color = TEXT_CYAN
|
||||
else:
|
||||
color = TEXT_NORMAL
|
||||
record.levelname = "\x1b[%sm%s\x1b[%sm" % (color, record.levelname, TEXT_NORMAL)
|
||||
return logging.Formatter.format(self, record)
|
||||
|
||||
|
||||
class Handler(logging.StreamHandler):
|
||||
|
||||
def __init__(self, log_level, debug=False, color=False):
|
||||
self.color = color
|
||||
self.debug = debug
|
||||
logging.StreamHandler.__init__(self, sys.stdout)
|
||||
self.setFormatter(Formatter(debug, color))
|
||||
self.setLevel(log_level)
|
||||
"""
|
||||
def emit(self, record):
|
||||
record.msg = "\x1b[%sm%s\x1b[%sm" % (TEXT_BOLD, record.msg, TEXT_NORMAL)
|
||||
logging.StreamHandler.emit(self, record)
|
||||
"""
|
|
@ -0,0 +1,116 @@
|
|||
"""
|
||||
Misc utilities.
|
||||
"""
|
||||
|
||||
from fractions import Fraction
|
||||
|
||||
|
||||
def ord_(dta):
|
||||
if isinstance(dta, str):
|
||||
return ord(dta)
|
||||
return dta
|
||||
|
||||
|
||||
def make_string(seq):
|
||||
"""
|
||||
Don't throw an exception when given an out of range character.
|
||||
"""
|
||||
string = ''
|
||||
for c in seq:
|
||||
# Screen out non-printing characters
|
||||
try:
|
||||
if 32 <= c and c < 256:
|
||||
string += chr(c)
|
||||
except TypeError:
|
||||
pass
|
||||
# If no printing chars
|
||||
if not string:
|
||||
return str(seq)
|
||||
return string
|
||||
|
||||
|
||||
def make_string_uc(seq):
|
||||
"""
|
||||
Special version to deal with the code in the first 8 bytes of a user comment.
|
||||
First 8 bytes gives coding system e.g. ASCII vs. JIS vs Unicode.
|
||||
"""
|
||||
seq = seq[8:]
|
||||
# Of course, this is only correct if ASCII, and the standard explicitly
|
||||
# allows JIS and Unicode.
|
||||
return make_string(seq)
|
||||
|
||||
|
||||
def s2n_motorola(string):
|
||||
"""Extract multi-byte integer in Motorola format (little endian)."""
|
||||
x = 0
|
||||
for c in string:
|
||||
x = (x << 8) | ord_(c)
|
||||
return x
|
||||
|
||||
|
||||
def s2n_intel(string):
|
||||
"""Extract multi-byte integer in Intel format (big endian)."""
|
||||
x = 0
|
||||
y = 0
|
||||
for c in string:
|
||||
x = x | (ord_(c) << y)
|
||||
y += + 8
|
||||
return x
|
||||
|
||||
def get_gps_coords(tags):
|
||||
|
||||
lng_ref_tag_name = "GPS GPSLongitudeRef"
|
||||
lng_tag_name = "GPS GPSLongitude"
|
||||
lat_ref_tag_name = "GPS GPSLatitudeRef"
|
||||
lat_tag_name = "GPS GPSLatitude"
|
||||
|
||||
# Check if these tags are present
|
||||
gps_tags = [lng_ref_tag_name,lng_tag_name,lat_tag_name,lat_tag_name]
|
||||
for tag in gps_tags:
|
||||
if not tag in tags.keys():
|
||||
return None
|
||||
|
||||
lng_ref_val = tags[lng_ref_tag_name].values
|
||||
lng_coord_val = [c.decimal() for c in tags[lng_tag_name].values]
|
||||
|
||||
lat_ref_val = tags[lat_ref_tag_name].values
|
||||
lat_coord_val = [c.decimal() for c in tags[lat_tag_name].values]
|
||||
|
||||
lng_coord = sum([c/60**i for i,c in enumerate(lng_coord_val)])
|
||||
lng_coord *= (-1)**(lng_ref_val=="W")
|
||||
|
||||
lat_coord = sum([c/60**i for i,c in enumerate(lat_coord_val)])
|
||||
lat_coord *= (-1)**(lat_ref_val=="S")
|
||||
|
||||
return (lat_coord, lng_coord)
|
||||
|
||||
class Ratio(Fraction):
|
||||
"""
|
||||
Ratio object that eventually will be able to reduce itself to lowest
|
||||
common denominator for printing.
|
||||
"""
|
||||
|
||||
# We're immutable, so use __new__ not __init__
|
||||
def __new__(cls, numerator=0, denominator=None):
|
||||
try:
|
||||
self = super(Ratio, cls).__new__(cls, numerator, denominator)
|
||||
except ZeroDivisionError:
|
||||
self = super(Ratio, cls).__new__(cls)
|
||||
self._numerator = numerator
|
||||
self._denominator = denominator
|
||||
return self
|
||||
__new__.doc = Fraction.__new__.__doc__
|
||||
|
||||
def __repr__(self):
|
||||
return str(self)
|
||||
|
||||
@property
|
||||
def num(self):
|
||||
return self.numerator
|
||||
|
||||
@property
|
||||
def den(self):
|
||||
return self.denominator
|
||||
|
||||
def decimal(self):
|
||||
return float(self)
|
|
@ -0,0 +1,126 @@
|
|||
import os.path
|
||||
import ntpath
|
||||
from time import localtime, strftime, strptime, mktime
|
||||
import shutil
|
||||
import exifread
|
||||
|
||||
unknownDateFolderName = "date-unknown"
|
||||
|
||||
def getMinimumCreationTime(exif_data):
|
||||
creationTime = None
|
||||
dateTime = exif_data.get('DateTime')
|
||||
dateTimeOriginal = exif_data.get('EXIF DateTimeOriginal')
|
||||
dateTimeDigitized = exif_data.get('EXIF DateTimeDigitized')
|
||||
|
||||
# 3 differnt time fields that can be set independently result in 9 if-cases
|
||||
if (dateTime is None):
|
||||
if (dateTimeOriginal is None):
|
||||
# case 1/9: dateTime, dateTimeOriginal, and dateTimeDigitized = None
|
||||
# case 2/9: dateTime and dateTimeOriginal = None, then use dateTimeDigitized
|
||||
creationTime = dateTimeDigitized
|
||||
else:
|
||||
# case 3/9: dateTime and dateTimeDigitized = None, then use dateTimeOriginal
|
||||
# case 4/9: dateTime = None, prefere dateTimeOriginal over dateTimeDigitized
|
||||
creationTime = dateTimeOriginal
|
||||
else:
|
||||
# case 5-9: when creationTime is set, prefere it over the others
|
||||
creationTime = dateTime
|
||||
|
||||
return creationTime
|
||||
|
||||
def postprocessImage(images, imageDirectory, fileName):
|
||||
imagePath = os.path.join(imageDirectory, fileName)
|
||||
image = open(imagePath, 'rb')
|
||||
creationTime = None
|
||||
try:
|
||||
exifTags = exifread.process_file(image, details=False)
|
||||
creationTime = getMinimumCreationTime(exifTags)
|
||||
except:
|
||||
print("invalid exif tags for " + fileName)
|
||||
|
||||
# distinct different time types
|
||||
if creationTime is None:
|
||||
creationTime = localtime(os.path.getctime(imagePath))
|
||||
else:
|
||||
try:
|
||||
creationTime = strptime(str(creationTime), "%Y:%m:%d %H:%M:%S")
|
||||
except:
|
||||
creationTime = localtime(os.path.getctime(imagePath))
|
||||
|
||||
images.append((mktime(creationTime), imagePath))
|
||||
image.close()
|
||||
|
||||
# Creates the requested path recursively.
|
||||
def createPath(newPath):
|
||||
if not os.path.exists(newPath):
|
||||
os.makedirs(newPath)
|
||||
|
||||
# Pass None for month to create 'year/eventNumber' directories instead of 'year/month/eventNumber'.
|
||||
def createNewFolder(destinationRoot, year, month, eventNumber):
|
||||
if month is not None:
|
||||
newPath = os.path.join(destinationRoot, year, month, str(eventNumber))
|
||||
else:
|
||||
newPath = os.path.join(destinationRoot, year, str(eventNumber))
|
||||
|
||||
createPath(newPath)
|
||||
|
||||
def createUnknownDateFolder(destinationRoot):
|
||||
path = os.path.join(destinationRoot, unknownDateFolderName)
|
||||
createPath(path)
|
||||
|
||||
def writeImages(images, destinationRoot, minEventDeltaDays, splitByMonth=False):
|
||||
minEventDelta = minEventDeltaDays * 60 * 60 * 24 # convert in seconds
|
||||
sortedImages = sorted(images)
|
||||
previousTime = None
|
||||
eventNumber = 0
|
||||
previousDestination = None
|
||||
today = strftime("%d/%m/%Y")
|
||||
|
||||
for imageTuple in sortedImages:
|
||||
destination = ""
|
||||
destinationFilePath = ""
|
||||
t = localtime(imageTuple[0])
|
||||
year = strftime("%Y", t)
|
||||
month = splitByMonth and strftime("%m", t) or None
|
||||
creationDate = strftime("%d/%m/%Y", t)
|
||||
fileName = ntpath.basename(imageTuple[1])
|
||||
|
||||
if(creationDate == today):
|
||||
createUnknownDateFolder(destinationRoot)
|
||||
destination = os.path.join(destinationRoot, unknownDateFolderName)
|
||||
destinationFilePath = os.path.join(destination, fileName)
|
||||
|
||||
else:
|
||||
if (previousTime == None) or ((previousTime + minEventDelta) < imageTuple[0]):
|
||||
eventNumber = eventNumber + 1
|
||||
createNewFolder(destinationRoot, year, month, eventNumber)
|
||||
|
||||
previousTime = imageTuple[0]
|
||||
|
||||
destComponents = [destinationRoot, year, month, str(eventNumber)]
|
||||
destComponents = [v for v in destComponents if v is not None]
|
||||
destination = os.path.join(*destComponents)
|
||||
|
||||
# it may be possible that an event covers 2 years.
|
||||
# in such a case put all the images to the event in the old year
|
||||
if not (os.path.exists(destination)):
|
||||
destination = previousDestination
|
||||
# destination = os.path.join(destinationRoot, str(int(year) - 1), str(eventNumber))
|
||||
|
||||
previousDestination = destination
|
||||
destinationFilePath = os.path.join(destination, fileName)
|
||||
|
||||
if not (os.path.exists(destinationFilePath)):
|
||||
shutil.move(imageTuple[1], destination)
|
||||
else:
|
||||
if (os.path.exists(imageTuple[1])):
|
||||
os.remove(imageTuple[1])
|
||||
|
||||
|
||||
def postprocessImages(imageDirectory, minEventDeltaDays, splitByMonth):
|
||||
images = []
|
||||
for root, dirs, files in os.walk(imageDirectory):
|
||||
for file in files:
|
||||
postprocessImage(images, imageDirectory, file)
|
||||
|
||||
writeImages(images, imageDirectory, minEventDeltaDays, splitByMonth)
|
|
@ -0,0 +1,102 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
|
||||
"""
|
||||
Movie Spy Is a programm to quickly collect all personal information
|
||||
from a computer. Ideally run from a live USB stick. The name comes
|
||||
from the fact that in movies spies can just quickly insert a USB key
|
||||
and get all the important data.
|
||||
|
||||
Usage:
|
||||
moviespy <source> <destination>
|
||||
moviespy (-h | --help)
|
||||
moviespy --version
|
||||
|
||||
Options:
|
||||
-h --help Show this screen.
|
||||
--version Show version.
|
||||
|
||||
"""
|
||||
|
||||
import os
|
||||
import fnmatch
|
||||
from shutil import copy2
|
||||
|
||||
from docopt import docopt
|
||||
import jpgSorter
|
||||
import numberOfFilesPerFolderLimiter as max_files_per_folder
|
||||
|
||||
|
||||
file_types = {
|
||||
'movies': ['mp4', 'mkv'],
|
||||
'documents': ['docx', 'xlsx', 'txt', 'doc', 'xls', 'pdf', 'odt', 'ods'],
|
||||
'pictures': ['jpg', 'png', 'gif'],
|
||||
'keys': ['key', 'kdbx', 'kdb', 'gpg']
|
||||
}
|
||||
maxNumberOfFilesPerFolder = 500
|
||||
splitMonths = True
|
||||
minEventDeltaDays = 4
|
||||
|
||||
|
||||
def find(pattern, path):
|
||||
result = {}
|
||||
for root, dirs, files in os.walk(path):
|
||||
for name in files:
|
||||
if fnmatch.fnmatch(name, pattern):
|
||||
result[os.path.join(root, name)] = name
|
||||
return result
|
||||
|
||||
|
||||
def find_extensions(extensions, path):
|
||||
path_dictionary = {}
|
||||
|
||||
for extension in extensions:
|
||||
extension_wildcard = "*." + extension
|
||||
files = find(extension_wildcard, path)
|
||||
path_dictionary[extension] = files
|
||||
return path_dictionary
|
||||
|
||||
|
||||
def copy_files(extensions, search_path, dest):
|
||||
path_dictionary = find_extensions(extensions, search_path)
|
||||
|
||||
for file_type, files_by_type in path_dictionary.items():
|
||||
if files_by_type:
|
||||
path = os.path.join(dest, file_type)
|
||||
if not os.path.exists(path):
|
||||
os.makedirs(path)
|
||||
|
||||
for file in files_by_type.items():
|
||||
if not os.path.islink(file):
|
||||
try:
|
||||
copy2(file, path)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
continue
|
||||
|
||||
|
||||
def sort_jpgs(location):
|
||||
jpgSorter.postprocessImages(location,
|
||||
minEventDeltaDays,
|
||||
splitMonths)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
arguments = docopt(__doc__, version='Movie Spy v1.0')
|
||||
source = arguments['<source>']
|
||||
destination = arguments['<destination>']
|
||||
|
||||
for file_type, extensions in file_types.items():
|
||||
type_destination = os.path.join(arguments['<destination>'], file_type)
|
||||
if not os.path.exists(type_destination):
|
||||
os.makedirs(type_destination)
|
||||
copy_files(extensions, source, type_destination)
|
||||
if 'jpg' in extensions:
|
||||
sort_jpgs(os.path.join(destination, type_destination, "jpg"))
|
||||
|
||||
max_files_per_folder.limitFilesPerFolder(destination,
|
||||
maxNumberOfFilesPerFolder)
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
import sys
|
||||
import math
|
||||
import os
|
||||
import shutil
|
||||
|
||||
|
||||
def limitFilesPerFolder(folder, maxNumberOfFilesPerFolder):
|
||||
for root, dirs, files in os.walk(folder, topdown=False):
|
||||
for dir in dirs:
|
||||
dirPath = os.path.join(root, dir)
|
||||
filesInFolder = len(os.listdir(dirPath))
|
||||
if(filesInFolder > maxNumberOfFilesPerFolder):
|
||||
numberOfSubfolders = ((filesInFolder - 1) // maxNumberOfFilesPerFolder) + 1
|
||||
for subFolderNumber in range(1, numberOfSubfolders+1):
|
||||
subFolderPath = os.path.join(dirPath, str(subFolderNumber))
|
||||
if not os.path.exists(subFolderPath):
|
||||
os.mkdir(subFolderPath)
|
||||
fileCounter = 1
|
||||
for file in os.listdir(dirPath):
|
||||
source = os.path.join(dirPath, file)
|
||||
if os.path.isfile(source):
|
||||
destDir = str(((fileCounter - 1) // maxNumberOfFilesPerFolder) + 1)
|
||||
destination = os.path.join(dirPath, destDir, file)
|
||||
shutil.move(source, destination)
|
||||
fileCounter += 1
|
||||
|
||||
|
|
@ -0,0 +1,126 @@
|
|||
#!/usr/bin/env python
|
||||
import os
|
||||
import os.path
|
||||
from time import localtime, strftime
|
||||
import shutil
|
||||
import jpgSorter
|
||||
import numberOfFilesPerFolderLimiter
|
||||
|
||||
|
||||
def getNumberOfFilesInFolderRecursively(start_path='.'):
|
||||
numberOfFiles = 0
|
||||
for dirpath, dirnames, filenames in os.walk(start_path):
|
||||
for f in filenames:
|
||||
fp = os.path.join(dirpath, f)
|
||||
if(os.path.isfile(fp)):
|
||||
numberOfFiles += 1
|
||||
return numberOfFiles
|
||||
|
||||
|
||||
def getNumberOfFilesInFolder(path):
|
||||
return len(os.listdir(path))
|
||||
|
||||
|
||||
def log(logString):
|
||||
print(strftime("%H:%M:%S", localtime()) + ": " + logString)
|
||||
|
||||
|
||||
def moveFile(file, destination):
|
||||
extension = os.path.splitext(file)[1][1:].upper()
|
||||
sourcePath = os.path.join(root, file)
|
||||
|
||||
destinationDirectory = os.path.join(destination, extension)
|
||||
|
||||
if not os.path.exists(destinationDirectory):
|
||||
os.mkdir(destinationDirectory)
|
||||
|
||||
fileName = str(fileCounter) + "." + extension.lower()
|
||||
destinationFile = os.path.join(destinationDirectory, fileName)
|
||||
if not os.path.exists(destinationFile):
|
||||
shutil.copy(sourcePath, destinationFile)
|
||||
|
||||
|
||||
def get_args():
|
||||
import argparse
|
||||
|
||||
description = (
|
||||
"Sort files recoverd by Photorec.\n"
|
||||
"The input files are first copied to the destination, sorted by file type.\n"
|
||||
"Then JPG files are sorted based on creation year (and optionally month).\n"
|
||||
"Finally any directories containing more than a maximum number of files are accordingly split into separate directories."
|
||||
)
|
||||
|
||||
parser = argparse.ArgumentParser(description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument('source', metavar='src', type=str, help='source directory with files recovered by Photorec')
|
||||
parser.add_argument('destination', metavar='dest', type=str, help='destination directory to write sorted files to')
|
||||
parser.add_argument('-n', '--max-per-dir', type=int, default=500, required=False, help='maximum number of files per directory')
|
||||
parser.add_argument('-m', '--split-months', action='store_true', required=False, help='split JPEG files not only by year but by month as well')
|
||||
parser.add_argument('-k', '--keep_filename', action='store_true', required=False, help='keeps the original filenames when copying')
|
||||
parser.add_argument('-d', '--min-event-delta', type=int, default=4, required=False, help='minimum delta in days between two days')
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
|
||||
maxNumberOfFilesPerFolder = 500
|
||||
splitMonths = False
|
||||
source = None
|
||||
destination = None
|
||||
keepFilename = False
|
||||
|
||||
|
||||
args = get_args()
|
||||
source = args.source
|
||||
destination = args.destination
|
||||
maxNumberOfFilesPerFolder = args.max_per_dir
|
||||
splitMonths = args.split_months
|
||||
keepFilename = args.keep_filename
|
||||
minEventDeltaDays = args.min_event_delta
|
||||
|
||||
print("Reading from source '%s', writing to destination '%s' (max %i files per directory, splitting by year %s)." %
|
||||
(source, destination, maxNumberOfFilesPerFolder, splitMonths and "and month" or "only"))
|
||||
if keepFilename:
|
||||
print("I will keep you filenames as they are")
|
||||
else:
|
||||
print("I will rename your files like '1.jpg'")
|
||||
|
||||
while ((source is None) or (not os.path.exists(source))):
|
||||
source = input('Enter a valid source directory\n')
|
||||
while ((destination is None) or (not os.path.exists(destination))):
|
||||
destination = input('Enter a valid destination directory\n')
|
||||
|
||||
fileNumber = getNumberOfFilesInFolderRecursively(source)
|
||||
onePercentFiles = int(fileNumber/100)
|
||||
totalAmountToCopy = str(fileNumber)
|
||||
print("Files to copy: " + totalAmountToCopy)
|
||||
|
||||
|
||||
fileCounter = 0
|
||||
for root, dirs, files in os.walk(source, topdown=False):
|
||||
|
||||
for file in files:
|
||||
extension = os.path.splitext(file)[1][1:].upper()
|
||||
sourcePath = os.path.join(root, file)
|
||||
|
||||
destinationDirectory = os.path.join(destination, extension)
|
||||
|
||||
if not os.path.exists(destinationDirectory):
|
||||
os.mkdir(destinationDirectory)
|
||||
if keepFilename:
|
||||
fileName = file
|
||||
else:
|
||||
fileName = str(fileCounter) + "." + extension.lower()
|
||||
|
||||
destinationFile = os.path.join(destinationDirectory, fileName)
|
||||
if not os.path.exists(destinationFile):
|
||||
shutil.copy2(sourcePath, destinationFile)
|
||||
|
||||
fileCounter += 1
|
||||
if((fileCounter % onePercentFiles) is 0):
|
||||
log(str(fileCounter) + " / " + totalAmountToCopy + " processed.")
|
||||
|
||||
log("start special file treatment")
|
||||
jpgSorter.postprocessImages(os.path.join(destination, "JPG"), minEventDeltaDays, splitMonths)
|
||||
|
||||
log("assure max file per folder number")
|
||||
numberOfFilesPerFolderLimiter.limitFilesPerFolder(destination, maxNumberOfFilesPerFolder)
|
Reference in New Issue