#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Various additional utilities for obspy.signal.
:copyright:
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import ctypes as C # NOQA
import math
import numpy as np
from scipy import fftpack, signal
from obspy.core.util.misc import factorize_int
from obspy.signal.headers import clibsignal
[docs]
def util_geo_km(orig_lon, orig_lat, lon, lat):
"""
Transform lon, lat to km with reference to orig_lon and orig_lat on the
elliptic Earth.
>>> util_geo_km(12.0, 48.0, 12.0, 48.0)
(0.0, 0.0)
>>> x, y = util_geo_km(12.0, 48.0, 13.0, 49.0)
>>> print(round(x,7))
73.9041417
>>> print(round(y,7))
111.1908262
:param orig_lon: Longitude of reference origin
:param orig_lat: Latitude of reference origin
:param lat: Latitude to calculate relative coordinate in km
:param lon: Longitude to calculate relative coordinate in km
:return: x, y coordinate in km (in reference to origin)
"""
# 2009-10-11 Moritz
x = C.c_double(lon)
y = C.c_double(lat)
clibsignal.utl_geo_km(orig_lon, orig_lat, 0.0, C.byref(x), C.byref(y))
return x.value, y.value
[docs]
def util_lon_lat(orig_lon, orig_lat, x, y):
"""
Transform x, y [km] to decimal degree in reference to orig_lon and orig_lat
>>> util_lon_lat(12.0, 48.0, 0.0, 0.0)
(12.0, 48.0)
>>> lon, lat = util_lon_lat(12.0, 48.0, 73.9041, 111.1908)
>>> print("%.4f, %.4f" % (lon, lat))
13.0000, 49.0000
:param orig_lon: Longitude of reference origin
:param orig_lat: Latitude of reference origin
:param x: value [km] to calculate relative coordinate in degree
:param y: value [km] to calculate relative coordinate in degree
:return: lon, lat coordinate in degree (absolute)
"""
# 2009-10-11 Moritz
clibsignal.utl_lonlat.argtypes = [C.c_double, C.c_double, C.c_double,
C.c_double, C.POINTER(C.c_double),
C.POINTER(C.c_double)]
clibsignal.utl_lonlat.restype = C.c_void_p
lon = C.c_double()
lat = C.c_double()
clibsignal.utl_lonlat(orig_lon, orig_lat, x, y, C.byref(lon), C.byref(lat))
return lon.value, lat.value
[docs]
def next_pow_2(i):
"""
Find the next power of two
>>> int(next_pow_2(5))
8
>>> int(next_pow_2(250))
256
"""
# do not use NumPy here, math is much faster for single values
buf = math.ceil(math.log(i) / math.log(2))
return int(math.pow(2, buf))
[docs]
def prev_pow_2(i):
"""
Find the previous power of two
>>> prev_pow_2(5)
4
>>> prev_pow_2(250)
128
"""
# do not use NumPy here, math is much faster for single values
return int(math.pow(2, math.floor(math.log(i, 2))))
[docs]
def nearest_pow_2(x):
"""
Finds the nearest integer that is a power of 2.
In contrast to :func:`next_pow_2` also searches for numbers smaller than
the input and returns them if they are closer than the next bigger power
of 2.
"""
a = math.pow(2, math.ceil(math.log(x, 2)))
b = math.pow(2, math.floor(math.log(x, 2)))
if abs(a - x) < abs(b - x):
return int(a)
else:
return int(b)
[docs]
def enframe(x, win, inc):
"""
Splits the vector up into (overlapping) frames beginning at increments
of inc. Each frame is multiplied by the window win().
The length of the frames is given by the length of the window win().
The centre of frame I is x((I-1)*inc+(length(win)+1)/2) for I=1,2,...
The mean is also subtracted from each individual frame.
:param x: signal to split in frames
:param win: window multiplied to each frame, length determines frame length
:param inc: increment to shift frames, in samples
:return f: output matrix, each frame occupies one row
:return length, no_win: length of each frame in samples, number of frames
"""
nx = len(x)
nwin = len(win)
if (nwin == 1):
length = win
else:
# length = next_pow_2(nwin)
length = nwin
nf = int(np.fix((nx - length + inc) // inc))
# f = np.zeros((nf, length))
indf = inc * np.arange(nf)
inds = np.arange(length) + 1
f = x[(np.transpose(np.vstack([indf] * length)) +
np.vstack([inds] * nf)) - 1]
if (nwin > 1):
w = np.transpose(win)
f = f * np.vstack([w] * nf)
f = signal.detrend(f, type='constant')
no_win, _ = f.shape
return f, length, no_win
[docs]
def smooth(x, smoothie):
"""
Smooths a given signal by computing a central moving average.
:param x: signal to smooth
:param smoothie: number of past/future values to calculate moving average
:return out: smoothed signal
"""
size_x = np.size(x)
if smoothie > 0:
if (len(x) > 1 and len(x) < size_x):
# out_add = append(append([x[0,:]]*smoothie,x,axis=0),
# [x[(len(x)-1),:]]*smoothie,axis=0)
# out_add = (np.append([x[0, :]]*int(smoothie), x, axis=0))
out_add = np.vstack(([x[0, :]] * int(smoothie), x,
[x[(len(x) - 1), :]] * int(smoothie)))
help = np.transpose(out_add)
# out = signal.lfilter(np.ones(smoothie) / smoothie, 1, help)
out = signal.lfilter(
np.hstack((np.ones(smoothie) / (2 * smoothie), 0,
np.ones(smoothie) / (2 * smoothie))), 1, help)
out = np.transpose(out)
# out = out[smoothie:len(out), :]
out = out[2 * smoothie:len(out), :]
# out = filter(ones(1,smoothie)/smoothie,1,out_add)
# out[1:smoothie,:] = []
else:
# out_add = np.append(np.append([x[0]] * smoothie, x),
# [x[size_x - 1]] * smoothie)
out_add = np.hstack(([x[0]] * int(smoothie), x,
[x[(len(x) - 1)]] * int(smoothie)))
out = signal.lfilter(np.hstack((
np.ones(smoothie) / (2 * smoothie), 0,
np.ones(smoothie) / (2 * smoothie))), 1, out_add)
out = out[2 * smoothie:len(out)]
out[0:smoothie] = out[smoothie]
out[len(out) - smoothie:len(out)] = out[len(out) - smoothie - 1]
# for i in xrange(smoothie, len(x) + smoothie):
# sum = 0
# for k in xrange(-smoothie, smoothie):
# sum = sum + out_add[i + k]
# suma[i - smoothie] = float(sum) / (2 * smoothie)
# out = suma
# out[0:smoothie] = out[smoothie]
# out[size_x - 1 - smoothie:size_x] = \
# out[size_x - 1 - smoothie]
else:
out = x
return out
[docs]
def rdct(x, n=0):
"""
Computes discrete cosine transform of given signal.
Signal is truncated/padded to length n.
:params x: signal to compute discrete cosine transform
:params n: window length (default: signal length)
:return y: discrete cosine transform
"""
m, k = x.shape
if (n == 0):
n = m
a = np.sqrt(2 * n)
x = np.append([x[0:n:2, :]], [x[2 * int(np.fix(n / 2)):0:-2, :]],
axis=1)
x = x[0, :, :]
z = np.append(np.sqrt(2.), 2. * np.exp((-0.5j * float(np.pi / n)) *
np.arange(1, n)))
y = np.real(np.multiply(np.transpose(fftpack.fft(np.transpose(x))),
np.transpose(np.array([z])) * np.ones(k))) / float(a)
return y
[docs]
def az2baz2az(angle):
"""
Helper function to convert from azimuth to backazimuth or from backazimuth
to azimuth.
:type angle: float or int
:param angle: azimuth or backazimuth value in degrees between 0 and 360.
:return: corresponding backazimuth or azimuth value in degrees.
"""
if 0 <= angle <= 180:
new_angle = angle + 180
elif 180 < angle <= 360:
new_angle = angle - 180
else:
raise ValueError("Input (back)azimuth out of bounds: %s" % angle)
return new_angle
[docs]
def _npts2nfft(npts, smart=True):
"""
Calculates number of points for fft from number of samples in trace.
When encountering bad values with prime factors involved (that can take
forever to compute) we try a few slightly larger numbers for a good
factorization (computation time for factorization is negligible compared to
fft/evalsresp/ifft) and if that fails we use the next power of 2 which is
not fastest but robust.
>>> _npts2nfft(1800028) # good nfft with minimum points
3600056
>>> int(_npts2nfft(1800029)) # falls back to next power of 2
4194304
>>> _npts2nfft(1800031) # finds suitable nfft close to minimum npts
3600082
"""
# The number of points for the FFT has to be at least 2 * ndat (in
# order to prohibit wrap around effects during convolution) cf.
# Numerical Recipes p. 429 calculate next power of 2.
# evalresp scales directly with nfft, therefor taking the next power of
# two has a greater negative performance impact than the slow down of a
# not power of two in the FFT
if npts & 0x1: # check if uneven
nfft = 2 * (npts + 1)
else:
nfft = 2 * npts
def _good_factorization(x):
if max(factorize_int(x)) < 500:
return True
return False
# check if we have a bad factorization with large primes
if smart and nfft > 5000 and not _good_factorization(nfft):
# try a few numbers slightly larger for a suitable factorization
# in most cases after less than 10 tries a suitable nfft number with
# good factorization is found
for i_ in range(1, 11):
trial = int(nfft + 2 * i_)
if _good_factorization(trial):
nfft = trial
break
else:
nfft = next_pow_2(nfft)
return nfft
[docs]
def stack(data, stack_type='linear'):
"""
Stack data by first axis.
:type stack_type: str or tuple
:param stack_type: Type of stack, one of the following:
``'linear'``: average stack (default),
``('pw', order)``: phase weighted stack of given order
(see [Schimmel1997]_, order 0 corresponds to linear stack),
``('root', order)``: root stack of given order
(order 1 corresponds to linear stack).
"""
if stack_type == 'linear':
stack = np.mean(data, axis=0)
elif stack_type[0] == 'pw':
from scipy.signal import hilbert
try:
from scipy.fftpack import next_fast_len
except ImportError: # scipy < 0.18
next_fast_len = next_pow_2
npts = np.shape(data)[1]
nfft = next_fast_len(npts)
anal_sig = hilbert(data, N=nfft)[:, :npts]
norm_anal_sig = anal_sig / np.abs(anal_sig)
phase_stack = np.abs(np.mean(norm_anal_sig, axis=0)) ** stack_type[1]
stack = np.mean(data, axis=0) * phase_stack
elif stack_type[0] == 'root':
r = np.mean(np.sign(data) * np.abs(data)
** (1 / stack_type[1]), axis=0)
stack = np.sign(r) * np.abs(r) ** stack_type[1]
else:
raise ValueError('stack type is not valid.')
return stack
if __name__ == '__main__':
import doctest
doctest.testmod(exclude_empty=True)