# -*- coding: utf-8 -*-
# -----------------------------------------------------------------
# Filename: freqattributes.py
# Author: Conny Hammer
# Email: conny.hammer@geo.uni-potsdam.de
#
# Copyright (C) 2008-2012 Conny Hammer
# -----------------------------------------------------------------
"""
Frequency Attributes
:copyright:
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
from operator import itemgetter
import numpy as np
from scipy import fftpack, signal, sparse
from obspy.signal import util
from obspy.signal.invsim import corn_freq_2_paz, simulate_seismometer
[docs]
def spectrum(data, win, nfft, n1=0, n2=0):
"""
Spectrum of a signal.
Computes the spectrum of the given data which can be windowed or not. The
spectrum is estimated using the modified periodogram. If n1 and n2 are not
specified the periodogram of the entire sequence is returned.
The modified periodogram of the given signal is returned.
:type data: :class:`~numpy.ndarray`
:param data: Data to make spectrum of.
:param win: Window to multiply with given signal.
:param nfft: Number of points for FFT.
:type n1: int, optional
:param n1: Starting index, defaults to ``0``.
:type n2: int, optional
:param n2: Ending index, defaults to ``0``.
:return: Spectrum.
"""
if (n2 == 0):
n2 = len(data)
n = n2 - n1
u = pow(np.linalg.norm([win]), 2) / n
xw = data * win
px = pow(abs(fftpack.fft(xw, nfft)), 2) / (n * u)
px[0] = px[1]
return px
[docs]
def welch(data, win, nfft, l=0, over=0): # NOQA
"""
Spectrum of a signal.
Computes the spectrum of the given data which can be windowed or not.
The spectrum is estimated using Welch's method of averaging modified
periodograms.
Welch's estimate of the power spectrum is returned using a linear scale.
:type data: :class:`~numpy.ndarray`
:param data: Data to make spectrum of.
:param win: Window to multiply with given signal.
:param nfft: Number of points for FFT.
:type l: int, optional
:param l: Length of windows to be averaged, defaults to ``0``.
:type over: float, optional
:param over: Overlap of windows to be averaged 0<over<1, defaults to ``0``.
:return: Spectrum.
"""
if (l == 0): # NOQA
l = len(data) # NOQA
n1 = 0
n2 = l
n0 = (1. - float(over)) * l
nsect = 1 + int(np.floor((len(data) - l) / (n0)))
px = 0
for _i in range(nsect):
px = px + spectrum(data, win, nfft, n1, n2) / nsect
n1 = n1 + n0
n2 = n2 + n0
return px
[docs]
def central_frequency(data, fs, smoothie, fk):
"""
Central frequency of a signal.
Computes the central frequency of the given data which can be windowed or
not. The central frequency is a measure of the frequency where the
power is concentrated. It corresponds to the second moment of the power
spectral density function.
The central frequency is returned.
:type data: :class:`~numpy.ndarray`
:param data: Data to estimate central frequency from.
:param fs: Sampling frequency in Hz.
:param smoothie: Factor for smoothing the result.
:param fk: Coefficients for calculating time derivatives
(calculated via central difference).
:return: **cfreq[, dcfreq]** - Central frequency, Time derivative of center
frequency (windowed only).
"""
# for windowed data
if np.size(data.shape) > 1:
cfreq = np.zeros(data.shape[0])
i = 0
for row in data:
cfreq[i] = central_frequency_unwindowed(row, fs)
i = i + 1
cfreq = util.smooth(cfreq, smoothie)
# cfreq_add = \
# np.append(np.append([cfreq[0]] * (np.size(fk) // 2), cfreq),
# [cfreq[np.size(cfreq) - 1]] * (np.size(fk) // 2))
# faster alternative
cfreq_add = np.hstack(
([cfreq[0]] * (np.size(fk) // 2), cfreq,
[cfreq[np.size(cfreq) - 1]] * (np.size(fk) // 2)))
dcfreq = signal.lfilter(fk, 1, cfreq_add)
# dcfreq = dcfreq[np.size(fk) // 2:(np.size(dcfreq) -
# np.size(fk) // 2)]
# correct start and end values of time derivative
dcfreq = dcfreq[np.size(fk) - 1:np.size(dcfreq)]
return cfreq, dcfreq
# for unwindowed data
else:
cfreq = central_frequency_unwindowed(data, fs)
return cfreq
[docs]
def central_frequency_unwindowed(data, fs):
"""
Central frequency of a signal.
Computes the central frequency of the given data (a single waveform).
The central frequency is a measure of the frequency where the
power is concentrated. It corresponds to the second moment of the power
spectral density function.
The central frequency is returned in Hz.
:type data: :class:`~numpy.ndarray`
:param data: Data to estimate central frequency from.
:param fs: Sampling frequency in Hz.
:return: **cfreq** - Central frequency in Hz
"""
nfft = util.next_pow_2(len(data))
freq = np.linspace(0, fs, nfft + 1)
freqaxis = freq[0:nfft // 2]
px_wm = welch(data, np.hamming(len(data)), nfft)
px = px_wm[0:len(px_wm) // 2]
cfreq = np.sqrt(np.sum(freqaxis ** 2 * px) / (sum(px)))
return cfreq
[docs]
def bandwidth(data, fs, smoothie, fk):
"""
Bandwidth of a signal.
Computes the bandwidth of the given data which can be windowed or not.
The bandwidth corresponds to the level where the power of the spectrum is
half its maximum value. It is determined as the level of 1/sqrt(2) times
the maximum Fourier amplitude.
If data are windowed the bandwidth of each window is returned.
:type data: :class:`~numpy.ndarray`
:param data: Data to make envelope of.
:param fs: Sampling frequency in Hz.
:param smoothie: Factor for smoothing the result.
:param fk: Coefficients for calculating time derivatives
(calculated via central difference).
:return: **bandwidth[, dbwithd]** - Bandwidth, Time derivative of
predominant period (windowed only).
"""
new_dtype = np.float32 if data.dtype.itemsize == 4 else np.float64
data = np.require(data, dtype=new_dtype)
nfft = util.next_pow_2(data.shape[1])
freqaxis = np.linspace(0, fs, nfft + 1)
bwith = np.zeros(data.shape[0])
f = fftpack.fft(data, nfft)
f_sm = util.smooth(abs(f[:, 0:nfft // 2]), 10)
if np.size(data.shape) > 1:
i = 0
for row in f_sm:
minfc = abs(row - max(abs(row * (1 / np.sqrt(2)))))
[mdist_ind, _mindist] = min(enumerate(minfc), key=itemgetter(1))
bwith[i] = freqaxis[mdist_ind]
i = i + 1
# bwith_add = \
# np.append(np.append([bandwidth[0]] * (np.size(fk) // 2), bandwidth),
# [bandwidth[np.size(bandwidth) - 1]] * (np.size(fk) // 2))
# faster alternative
bwith_add = np.hstack(
([bwith[0]] * (np.size(fk) // 2), bwith,
[bwith[np.size(bwith) - 1]] * (np.size(fk) // 2)))
dbwith = signal.lfilter(fk, 1, bwith_add)
# dbwith = dbwith[np.size(fk) // 2:(np.size(dbwith) -
# np.size(fk) // 2)]
# correct start and end values of time derivative
dbwith = dbwith[np.size(fk) - 1:]
bwith = util.smooth(bwith, smoothie)
dbwith = util.smooth(dbwith, smoothie)
return bwith, dbwith
else:
minfc = abs(data - max(abs(data * (1 / np.sqrt(2)))))
[mdist_ind, _mindist] = min(enumerate(minfc), key=itemgetter(1))
bwith = freqaxis[mdist_ind]
return bwith
[docs]
def dominant_period(data, fs, smoothie, fk):
"""
Predominant period of a signal.
Computes the predominant period of the given data which can be windowed or
not. The period is determined as the period of the maximum value of the
Fourier amplitude spectrum.
If data are windowed the predominant period of each window is returned.
:type data: :class:`~numpy.ndarray`
:param data: Data to determine predominant period of.
:param fs: Sampling frequency in Hz.
:param smoothie: Factor for smoothing the result.
:param fk: Coefficients for calculating time derivatives
(calculated via central difference).
:return: **dperiod[, ddperiod]** - Predominant period, Time derivative of
predominant period (windowed only).
"""
new_dtype = np.float32 if data.dtype.itemsize == 4 else np.float64
data = np.require(data, dtype=new_dtype)
nfft = 1024
# nfft = util.next_pow_2(data.shape[1])
freqaxis = np.linspace(0, fs, nfft + 1)
dperiod = np.zeros(data.shape[0])
f = fftpack.fft(data, nfft)
# f_sm = util.smooth(abs(f[:,0:nfft // 2]),1)
f_sm = f[:, 0:nfft // 2]
if np.size(data.shape) > 1:
i = 0
for row in f_sm:
[mdist_ind, _mindist] = max(enumerate(abs(row)), key=itemgetter(1))
dperiod[i] = freqaxis[mdist_ind]
i = i + 1
# dperiod_add = np.append(np.append([dperiod[0]] * \
# (np.size(fk) // 2), dperiod),
# [dperiod[np.size(dperiod) - 1]] * (np.size(fk) // 2))
# faster alternative
dperiod_add = np.hstack(
([dperiod[0]] * (np.size(fk) // 2), dperiod,
[dperiod[np.size(dperiod) - 1]] * (np.size(fk) // 2)))
ddperiod = signal.lfilter(fk, 1, dperiod_add)
# ddperiod = ddperiod[np.size(fk) / \
# 2:(np.size(ddperiod) - np.size(fk) // 2)]
# correct start and end values of time derivative
ddperiod = ddperiod[np.size(fk) - 1:]
dperiod = util.smooth(dperiod, smoothie)
ddperiod = util.smooth(ddperiod, smoothie)
return dperiod, ddperiod
else:
[mdist_ind, _mindist] = max(enumerate(abs(data)), key=itemgetter(1))
dperiod = freqaxis[mdist_ind]
return dperiod
[docs]
def log_spaced_filterbank_matrix(p, n, fs, w):
"""
Matrix for a log-spaced filterbank.
Computes a matrix containing the filterbank amplitudes for a log-spaced
filterbank.
:param p: Number of filters in filterbank.
:param n: Length of fft.
:param fs: Sampling frequency in Hz.
:param w: Window function.
:return: **xx, yy, zz** - Matrix containing the filterbank amplitudes,
Lowest fft bin with a non-zero coefficient, Highest fft bin with a
non-zero coefficient.
"""
# alternative to avoid above problems: low end of the lowest filter
# corresponds to maximum frequency resolution
fn2 = np.floor(n / 2)
fl = np.floor(fs) / np.floor(n)
fh = np.floor(fs / 2)
lr = np.log((fh) / (fl)) / (p + 1)
bl = n * ((fl) *
np.exp(np.array([0, 1, p, p + 1]) * float(lr)) / float(fs))
b2 = int(np.ceil(bl[1]))
b3 = int(np.floor(bl[2]))
b1 = int(np.floor(bl[0])) + 1
b4 = int(min(fn2, np.ceil(bl[3]))) - 1
pf = np.log(np.arange(b1 - 1, b4 + 1, dtype=np.float64) / n * fs / fl) / lr
fp = np.floor(pf).astype(np.int64)
pm = pf - fp
k2 = b2 - b1 + 1
k3 = b3 - b1 + 1
k4 = b4 - b1 + 1
r = np.append(fp[k2:k4 + 2], 1 + fp[1:k3 + 1]) - 1
c = np.append(np.arange(k2, k4 + 1), np.arange(1, k3 + 1)) - 1
v = 2 * np.append([1 - pm[k2:k4 + 1]], [pm[1:k3 + 1]])
mn = b1 + 1
mx = b4 + 1
# x = np.array([[c],[r]], dtype=[('x', float), ('y', float)])
# ind=np.argsort(x, order=('x','y'))
if (w == 'Hann'):
v = 1. - [np.cos([v * float(np.pi / 2.)])]
elif (w == 'Hamming'):
v = 1. - 0.92 / 1.08 * np.cos(v * float(np.pi / 2))
# bugfix for #70 - scipy.sparse.csr_matrix() delivers sometimes a
# transposed matrix depending on the installed NumPy version - using
# scipy.sparse.coo_matrix() ensures compatibility with old NumPy versions
xx = np.array(sparse.coo_matrix((v, (c, r))).transpose().todense())
return xx, mn - 1, mx - 1
[docs]
def log_cepstrum(data, fs, nc, p, n, w): # @UnusedVariable: n is never used!!!
"""
Cepstrum of a signal.
Computes the cepstral coefficient on a logarithmic scale of the given data
which can be windowed or not.
If data are windowed the analytic signal and the envelope of each window is
returned.
:type data: :class:`~numpy.ndarray`
:param data: Data to make envelope of.
:param fs: Sampling frequency in Hz.
:param nc: number of cepstral coefficients.
:param p: Number of filters in filterbank.
:param n: Number of data windows.
:return: Cepstral coefficients.
"""
new_dtype = np.float32 if data.dtype.itemsize == 4 else np.float64
data = np.require(data, dtype=new_dtype)
data_t = np.transpose(data)
nfft = util.next_pow_2(data_t.shape[0])
fc = fftpack.fft(data_t, nfft, 0)
f = fc[1:len(fc) // 2 + 1, :]
m, a, b = log_spaced_filterbank_matrix(p, nfft, fs, w)
pw = np.real(np.multiply(f[a:b, :], np.conj(f[a:b, :])))
pth = np.max(pw) * 1E-20
ath = np.sqrt(pth)
# h1 = np.transpose(np.array([[ath] * int(b + 1 - a)]))
# h2 = m * abs(f[a - 1:b, :])
y = np.log(np.maximum(m @ abs(f[a - 1:b, :]), ath))
z = util.rdct(y)
z = z[1:, :]
# nc = nc + 1
nf = np.size(z, 1)
if (p > nc):
z = z[:nc, :]
elif (p < nc):
z = np.vstack([z, np.zeros(nf, nc - p)])
return z
[docs]
def peak_ground_motion(data, delta, freq, damp=0.1):
"""
Peak ground motion parameters
Compute the maximal displacement, velocity, acceleration and the peak
ground acceleration at a certain frequency (standard periods for
ShakeMaps are 0.3/1.0/3.0 seconds. Note that the above input is expected as
frequency in Hertz.).
Data must be displacement
:type data: :class:`~numpy.ndarray`
:param data: Data in displacement to convolve with pendulum at freq.
:type delta: float
:param delta: Sampling interval
:type freq: float
:param freq: Frequency in Hz.
:type damp: float
:param damp: damping factor. Default is set to 0.1
:rtype: (float, float, float, float)
:return: Peak Ground Acceleration, maximal displacement, velocity,
acceleration
"""
data = data.copy()
# Displacement
if abs(max(data)) >= abs(min(data)):
m_dis = abs(max(data))
else:
m_dis = abs(min(data))
# Velocity
data = np.gradient(data, delta)
if abs(max(data)) >= abs(min(data)):
m_vel = abs(max(data))
else:
m_vel = abs(min(data))
# Acceleration
data = np.gradient(data, delta)
if abs(max(data)) >= abs(min(data)):
m_acc = abs(max(data))
else:
m_acc = abs(min(data))
samp_rate = 1.0 / delta
t = freq * 1.0
d = damp
omega = (2 * 3.14159 * t) ** 2
paz_sa = corn_freq_2_paz(t, damp=d)
paz_sa['sensitivity'] = omega
paz_sa['zeros'] = []
data = simulate_seismometer(data, samp_rate, paz_remove=None,
paz_simulate=paz_sa, taper=True,
simulate_sensitivity=True, taper_fraction=0.05)
if abs(max(data)) >= abs(min(data)):
pga = abs(max(data))
else:
pga = abs(min(data))
return (pga, m_dis, m_vel, m_acc)