# -*- coding: utf-8 -*-
"""
Decorator used in ObsPy.
:copyright:
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import functools
import inspect
from pathlib import Path
import re
import socket
import tarfile
import warnings
import zipfile
import numpy as np
from decorator import decorator
from obspy.core.util import get_example_file
from obspy.core.util.base import NamedTemporaryFile
from obspy.core.util.deprecation_helpers import ObsPyDeprecationWarning
[docs]
def deprecated(warning_msg=None):
"""
This is a decorator which can be used to mark functions as deprecated.
.. note::
Actually, this is not a decorator itself but a decorator factory,
returning the correct decorator for the specified options. It can be
used just like a decorator.
It will result in a warning being emitted when the function is used.
"""
@decorator
def _deprecated(func, *args, **kwargs):
if 'deprecated' in str(func.__doc__).lower():
msg = func.__doc__
elif warning_msg:
msg = warning_msg
func.__doc__ = warning_msg
else:
msg = "Call to deprecated function %s." % func.__name__
warnings.warn(msg, category=ObsPyDeprecationWarning, stacklevel=3)
return func(*args, **kwargs)
return _deprecated
[docs]
def deprecated_keywords(keywords):
"""
Decorator for marking keywords as deprecated.
.. note::
Actually, this is not a decorator itself but a decorator factory,
returning the correct decorator for the specified options. It can be
used just like a decorator.
:type keywords: dict
:param keywords: old/new keyword names as key/value pairs.
"""
def fdec(func):
fname = func.__name__
msg = "Deprecated keyword %s in %s() call - please use %s instead."
msg2 = "Deprecated keyword %s in %s() call - ignoring."
msg3 = ("Conflicting deprecated keywords (%s) in %s() call"
" - please use new '%s' keyword instead.")
@functools.wraps(func)
def echo_func(*args, **kwargs):
# check if multiple deprecated keywords get mapped to the same new
# keyword
new_keyword_appearance_counts = dict.fromkeys(keywords.values(), 0)
for key, new_key in keywords.items():
if key in kwargs:
new_keyword_appearance_counts[new_key] += 1
for key_ in keywords.values():
# ignore `None` as new value, it means that no mapping is
# happening..
if key_ is None:
continue
if new_keyword_appearance_counts[key_] > 1:
conflicting_keys = ", ".join(
[old_key for old_key, new_key in keywords.items()
if new_key == key_])
raise Exception(msg3 % (conflicting_keys, fname, new_key))
# map deprecated keywords to new keywords
for kw in list(kwargs):
if kw in keywords:
nkw = keywords[kw]
if nkw is None:
warnings.warn(msg2 % (kw, fname),
category=ObsPyDeprecationWarning,
stacklevel=3)
else:
warnings.warn(msg % (kw, fname, nkw),
category=ObsPyDeprecationWarning,
stacklevel=3)
kwargs[nkw] = kwargs[kw]
del kwargs[kw]
return func(*args, **kwargs)
return echo_func
return fdec
@decorator
def skip_on_network_error(func, *args, **kwargs):
"""
Decorator to mark test routines that fail with certain network
errors (e.g. timeouts) as "skipped" rather than "Error".
"""
import pytest # NOQA
try:
return func(*args, **kwargs)
###################################################
# add more except clauses like this to add other
# network errors that should be skipped
except socket.timeout as e:
if str(e) == "timed out":
pytest.skip(str(e))
###################################################
except socket.error as e:
if str(e) == "[Errno 110] Connection timed out":
pytest.skip(str(e))
# general except to be able to generally reraise
except Exception:
raise
@decorator
def uncompress_file(func, filename, *args, **kwargs):
"""
Decorator used for temporary uncompressing file if .gz or .bz2 archive.
"""
if not kwargs.pop('check_compression', True):
return func(filename, *args, **kwargs)
if not isinstance(filename, str):
return func(filename, *args, **kwargs)
elif not Path(filename).exists():
msg = "File not found '%s'" % (filename)
raise IOError(msg)
# check if we got a compressed file or archive
obj_list = []
if tarfile.is_tarfile(filename):
try:
# reading with transparent compression
with tarfile.open(filename, 'r|*') as tar:
for tarinfo in tar:
# only handle regular files
if not tarinfo.isfile():
continue
data = tar.extractfile(tarinfo).read()
# Skip empty files - we don't need them no matter what
# and it guards against rare cases where waveforms files
# are also slightly valid tar-files.
if not data:
continue
obj_list.append(data)
except Exception:
pass
elif zipfile.is_zipfile(filename):
try:
with zipfile.ZipFile(filename) as zip:
if b'obspy_no_uncompress' in zip.comment:
# be nice to plugins based on zip format
# do not uncompress the file if tag is present
# see issue #3192
obj_list = None
else:
obj_list = [zip.read(name) for name in zip.namelist()]
except Exception:
pass
elif filename.endswith('.bz2'):
# bz2 module
try:
import bz2
with open(filename, 'rb') as fp:
obj_list.append(bz2.decompress(fp.read()))
except Exception:
pass
elif filename.endswith('.gz'):
# gzip module
try:
import gzip
with gzip.open(filename, 'rb') as fp:
obj_list.append(fp.read())
except Exception:
pass
# handle results
if obj_list:
# write results to temporary files
result = None
for obj in obj_list:
with NamedTemporaryFile() as tempfile:
tempfile._fileobj.write(obj)
stream = func(tempfile.name, *args, **kwargs)
# just add other stream objects to first stream
if result is None:
result = stream
else:
result += stream
else:
# no compressions
result = func(filename, *args, **kwargs)
return result
@decorator
def raise_if_masked(func, *args, **kwargs):
"""
Raises if the first argument (self in case of methods) is a Trace with
masked values or a Stream containing a Trace with masked values.
"""
arrays = []
# first arg seems to be a Stream
if hasattr(args[0], "traces"):
arrays = [tr.data for tr in args[0]]
# first arg seems to be a Trace
if hasattr(args[0], "data") and isinstance(args[0].data, np.ndarray):
arrays = [args[0].data]
for arr in arrays:
if np.ma.is_masked(arr):
msg = "Trace with masked values found. This is not " + \
"supported for this operation. Try the split() " + \
"method on Trace/Stream to produce a Stream with " + \
"unmasked Traces."
raise NotImplementedError(msg)
return func(*args, **kwargs)
@decorator
def skip_if_no_data(func, *args, **kwargs):
"""
Does nothing if the first argument (self in case of methods) is a Trace
with no data in it.
"""
if not args[0]:
return
return func(*args, **kwargs)
[docs]
def map_example_filename(arg_kwarg_name):
"""
Decorator that replaces "/path/to/filename" patterns in the arg or kwarg
of the specified name with the correct file path. If the pattern is not
encountered nothing is done.
.. note::
Actually, this is not a decorator itself but a decorator factory,
returning the correct decorator for the specified options. It can be
used just like a decorator.
:type arg_kwarg_name: str
:param arg_kwarg_name: name of the arg/kwarg that should be (tried) to map
"""
@decorator
def _map_example_filename(func, *args, **kwargs):
prefix = '/path/to/'
# check kwargs
if arg_kwarg_name in kwargs:
if isinstance(kwargs[arg_kwarg_name], str):
if re.match(prefix, kwargs[arg_kwarg_name]):
try:
kwargs[arg_kwarg_name] = \
get_example_file(kwargs[arg_kwarg_name][9:])
# file not found by get_example_file:
except IOError:
pass
# check args
else:
try:
inspected_args = [
p.name
for p in inspect.signature(func).parameters.values()
]
except AttributeError:
inspected_args = inspect.getargspec(func).args
try:
ind = inspected_args.index(arg_kwarg_name)
except ValueError:
pass
else:
if ind < len(args) and isinstance(args[ind], str):
# need to check length of args from inspect
if re.match(prefix, args[ind]):
try:
args = list(args)
args[ind] = get_example_file(args[ind][9:])
args = tuple(args)
# file not found by get_example_file:
except IOError:
pass
return func(*args, **kwargs)
return _map_example_filename
if __name__ == '__main__':
import doctest
doctest.testmod(exclude_empty=True)