# -*- coding: utf-8 -*-
"""
Decorator used in ObsPy.
:copyright:
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import functools
import inspect
from pathlib import Path
import re
import socket
import tarfile
import unittest
import warnings
import zipfile
import numpy as np
from decorator import decorator
from obspy.core.util import get_example_file
from obspy.core.util.base import NamedTemporaryFile
from obspy.core.util.deprecation_helpers import ObsPyDeprecationWarning
[docs]def deprecated(warning_msg=None):
"""
This is a decorator which can be used to mark functions as deprecated.
.. note::
Actually, this is not a decorator itself but a decorator factory,
returning the correct decorator for the specified options. It can be
used just like a decorator.
It will result in a warning being emitted when the function is used.
"""
@decorator
def _deprecated(func, *args, **kwargs):
if 'deprecated' in str(func.__doc__).lower():
msg = func.__doc__
elif warning_msg:
msg = warning_msg
func.__doc__ = warning_msg
else:
msg = "Call to deprecated function %s." % func.__name__
warnings.warn(msg, category=ObsPyDeprecationWarning, stacklevel=3)
return func(*args, **kwargs)
return _deprecated
[docs]def deprecated_keywords(keywords):
"""
Decorator for marking keywords as deprecated.
.. note::
Actually, this is not a decorator itself but a decorator factory,
returning the correct decorator for the specified options. It can be
used just like a decorator.
:type keywords: dict
:param keywords: old/new keyword names as key/value pairs.
"""
def fdec(func):
fname = func.__name__
msg = "Deprecated keyword %s in %s() call - please use %s instead."
msg2 = "Deprecated keyword %s in %s() call - ignoring."
msg3 = ("Conflicting deprecated keywords (%s) in %s() call"
" - please use new '%s' keyword instead.")
@functools.wraps(func)
def echo_func(*args, **kwargs):
# check if multiple deprecated keywords get mapped to the same new
# keyword
new_keyword_appearance_counts = dict.fromkeys(keywords.values(), 0)
for key, new_key in keywords.items():
if key in kwargs:
new_keyword_appearance_counts[new_key] += 1
for key_ in keywords.values():
# ignore `None` as new value, it means that no mapping is
# happening..
if key_ is None:
continue
if new_keyword_appearance_counts[key_] > 1:
conflicting_keys = ", ".join(
[old_key for old_key, new_key in keywords.items()
if new_key == key_])
raise Exception(msg3 % (conflicting_keys, fname, new_key))
# map deprecated keywords to new keywords
for kw in list(kwargs):
if kw in keywords:
nkw = keywords[kw]
if nkw is None:
warnings.warn(msg2 % (kw, fname),
category=ObsPyDeprecationWarning,
stacklevel=3)
else:
warnings.warn(msg % (kw, fname, nkw),
category=ObsPyDeprecationWarning,
stacklevel=3)
kwargs[nkw] = kwargs[kw]
del kwargs[kw]
return func(*args, **kwargs)
return echo_func
return fdec
@decorator
def skip_on_network_error(func, *args, **kwargs):
"""
Decorator for unittest to mark test routines that fail with certain network
errors (e.g. timeouts) as "skipped" rather than "Error".
"""
try:
return func(*args, **kwargs)
###################################################
# add more except clauses like this to add other
# network errors that should be skipped
except socket.timeout as e:
if str(e) == "timed out":
raise unittest.SkipTest(str(e))
###################################################
except socket.error as e:
if str(e) == "[Errno 110] Connection timed out":
raise unittest.SkipTest(str(e))
# general except to be able to generally reraise
except Exception:
raise
@decorator
def uncompress_file(func, filename, *args, **kwargs):
"""
Decorator used for temporary uncompressing file if .gz or .bz2 archive.
"""
if not kwargs.pop('check_compression', True):
return func(filename, *args, **kwargs)
if not isinstance(filename, str):
return func(filename, *args, **kwargs)
elif not Path(filename).exists():
msg = "File not found '%s'" % (filename)
raise IOError(msg)
# check if we got a compressed file or archive
obj_list = []
if tarfile.is_tarfile(filename):
try:
# reading with transparent compression
with tarfile.open(filename, 'r|*') as tar:
for tarinfo in tar:
# only handle regular files
if not tarinfo.isfile():
continue
data = tar.extractfile(tarinfo).read()
# Skip empty files - we don't need them no matter what
# and it guards against rare cases where waveforms files
# are also slightly valid tar-files.
if not data:
continue
obj_list.append(data)
except Exception:
pass
elif zipfile.is_zipfile(filename):
try:
with zipfile.ZipFile(filename) as zip:
if b'obspy_no_uncompress' in zip.comment:
# be nice to plugins based on zip format
# do not uncompress the file if tag is present
# see issue #3192
obj_list = None
else:
obj_list = [zip.read(name) for name in zip.namelist()]
except Exception:
pass
elif filename.endswith('.bz2'):
# bz2 module
try:
import bz2
with open(filename, 'rb') as fp:
obj_list.append(bz2.decompress(fp.read()))
except Exception:
pass
elif filename.endswith('.gz'):
# gzip module
try:
import gzip
with gzip.open(filename, 'rb') as fp:
obj_list.append(fp.read())
except Exception:
pass
# handle results
if obj_list:
# write results to temporary files
result = None
for obj in obj_list:
with NamedTemporaryFile() as tempfile:
tempfile._fileobj.write(obj)
stream = func(tempfile.name, *args, **kwargs)
# just add other stream objects to first stream
if result is None:
result = stream
else:
result += stream
else:
# no compressions
result = func(filename, *args, **kwargs)
return result
@decorator
def raise_if_masked(func, *args, **kwargs):
"""
Raises if the first argument (self in case of methods) is a Trace with
masked values or a Stream containing a Trace with masked values.
"""
arrays = []
# first arg seems to be a Stream
if hasattr(args[0], "traces"):
arrays = [tr.data for tr in args[0]]
# first arg seems to be a Trace
if hasattr(args[0], "data") and isinstance(args[0].data, np.ndarray):
arrays = [args[0].data]
for arr in arrays:
if np.ma.is_masked(arr):
msg = "Trace with masked values found. This is not " + \
"supported for this operation. Try the split() " + \
"method on Trace/Stream to produce a Stream with " + \
"unmasked Traces."
raise NotImplementedError(msg)
return func(*args, **kwargs)
@decorator
def skip_if_no_data(func, *args, **kwargs):
"""
Does nothing if the first argument (self in case of methods) is a Trace
with no data in it.
"""
if not args[0]:
return
return func(*args, **kwargs)
[docs]def map_example_filename(arg_kwarg_name):
"""
Decorator that replaces "/path/to/filename" patterns in the arg or kwarg
of the specified name with the correct file path. If the pattern is not
encountered nothing is done.
.. note::
Actually, this is not a decorator itself but a decorator factory,
returning the correct decorator for the specified options. It can be
used just like a decorator.
:type arg_kwarg_name: str
:param arg_kwarg_name: name of the arg/kwarg that should be (tried) to map
"""
@decorator
def _map_example_filename(func, *args, **kwargs):
prefix = '/path/to/'
# check kwargs
if arg_kwarg_name in kwargs:
if isinstance(kwargs[arg_kwarg_name], str):
if re.match(prefix, kwargs[arg_kwarg_name]):
try:
kwargs[arg_kwarg_name] = \
get_example_file(kwargs[arg_kwarg_name][9:])
# file not found by get_example_file:
except IOError:
pass
# check args
else:
try:
inspected_args = [
p.name
for p in inspect.signature(func).parameters.values()
]
except AttributeError:
inspected_args = inspect.getargspec(func).args
try:
ind = inspected_args.index(arg_kwarg_name)
except ValueError:
pass
else:
if ind < len(args) and isinstance(args[ind], str):
# need to check length of args from inspect
if re.match(prefix, args[ind]):
try:
args = list(args)
args[ind] = get_example_file(args[ind][9:])
args = tuple(args)
# file not found by get_example_file:
except IOError:
pass
return func(*args, **kwargs)
return _map_example_filename
if __name__ == '__main__':
import doctest
doctest.testmod(exclude_empty=True)