# Source code for obspy.signal.polarization

# -*- coding: utf-8 -*-
"""
Functions for polarization analysis.

The ObsPy Development Team (devs@obspy.org)
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import *  # NOQA

import math
import warnings

import numpy as np
import scipy.odr
from scipy import signal
from scipy.optimize import fminbound

from obspy.signal.invsim import cosine_taper

[docs]def eigval(datax, datay, dataz, fk, normf=1.0):
"""
Polarization attributes of a signal.

Computes the rectilinearity, the planarity and the eigenvalues of the given
data which can be windowed or not.
The time derivatives are calculated by central differences and the
parameter fk describes the coefficients of the used polynomial. The
values of fk depend on the order of the derivative you want to
calculate. If you do not want to use derivatives you can simply use
[1, 1, 1, 1, 1] for fk.

The algorithm is mainly based on the paper by [Jurkevics1988]_. The rest is
just the numerical differentiation by central differences (carried out by
the routine :func:scipy.signal.lfilter(data, 1, fk)).

:param datax: Data of x component. Note this is most useful with
windowed data, represented by a 2 dimensional array. First
dimension is the window number, second dimension is the data.
:type datax: :class:~numpy.ndarray
:param datay: Data of y component. See description of datax.
:type datay: :class:~numpy.ndarray
:param dataz: Data of z component. See description of datax.
:type dataz: :class:~numpy.ndarray
:param fk: Coefficients of polynomial used to calculate the time
derivatives.
:type fk: list
:param normf: Factor for normalization.
:type normf: float
:return: **leigenv1, leigenv2, leigenv3, rect, plan, dleigenv, drect,
dplan** - Smallest eigenvalue, Intermediate eigenvalue, Largest
eigenvalue, Rectilinearity, Planarity, Time derivative of eigenvalues,
time derivative of rectilinearity, Time derivative of planarity.
"""
# The function is made for windowed data (two dimensional input).
# However be nice and allow one dimensional input, see #919
datax = np.atleast_2d(datax)
datay = np.atleast_2d(datay)
dataz = np.atleast_2d(dataz)
covmat = np.zeros([3, 3])
leigenv1 = np.zeros(datax.shape[0], dtype=np.float64)
leigenv2 = np.zeros(datax.shape[0], dtype=np.float64)
leigenv3 = np.zeros(datax.shape[0], dtype=np.float64)
dleigenv = np.zeros([datax.shape[0], 3], dtype=np.float64)
rect = np.zeros(datax.shape[0], dtype=np.float64)
plan = np.zeros(datax.shape[0], dtype=np.float64)
i = 0
for i in range(datax.shape[0]):
covmat[0][0] = np.cov(datax[i, :], rowvar=False)
covmat[0][1] = covmat[1][0] = np.cov(datax[i, :], datay[i, :],
rowvar=False)[0, 1]
covmat[0][2] = covmat[2][0] = np.cov(datax[i, :], dataz[i, :],
rowvar=False)[0, 1]
covmat[1][1] = np.cov(datay[i, :], rowvar=False)
covmat[1][2] = covmat[2][1] = np.cov(dataz[i, :], datay[i, :],
rowvar=False)[0, 1]
covmat[2][2] = np.cov(dataz[i, :], rowvar=False)
_eigvec, eigenval, _v = (np.linalg.svd(covmat))
eigenv = np.sort(eigenval)
leigenv1[i] = eigenv[0]
leigenv2[i] = eigenv[1]
leigenv3[i] = eigenv[2]
rect[i] = 1 - ((eigenv[1] + eigenv[0]) / (2 * eigenv[2]))
plan[i] = 1 - ((2 * eigenv[0]) / (eigenv[1] + eigenv[2]))
leigenv1 = leigenv1 / normf
leigenv2 = leigenv2 / normf
leigenv3 = leigenv3 / normf

leigenv1_add = np.append(np.append([leigenv1[0]] * (np.size(fk) // 2),
leigenv1),
[leigenv1[np.size(leigenv1) - 1]] *
(np.size(fk) // 2))
dleigenv[:, 0] = dleigenv1[len(fk) - 1:]

leigenv2_add = np.append(np.append([leigenv2[0]] * (np.size(fk) // 2),
leigenv2),
[leigenv2[np.size(leigenv2) - 1]] *
(np.size(fk) // 2))
dleigenv[:, 1] = dleigenv2[len(fk) - 1:]

leigenv3_add = np.append(np.append([leigenv3[0]] * (np.size(fk) // 2),
leigenv3),
[leigenv3[np.size(leigenv3) - 1]] *
(np.size(fk) // 2))
dleigenv[:, 2] = dleigenv3[len(fk) - 1:]

rect_add = np.append(np.append([rect[0]] * (np.size(fk) // 2), rect),
[rect[np.size(rect) - 1]] * (np.size(fk) // 2))
drect = drect[len(fk) - 1:]

plan_add = np.append(np.append([plan[0]] * (np.size(fk) // 2), plan),
[plan[np.size(plan) - 1]] * (np.size(fk) // 2))
dplan = dplan[len(fk) - 1:]

return leigenv1, leigenv2, leigenv3, rect, plan, dleigenv, drect, dplan

[docs]def flinn(stream, noise_thres=0):
"""
Computes the azimuth, incidence, rectilinearity and planarity after the
eigenstructure decomposition method of [Flinn1965b]_.

:param stream: ZNE sorted trace data
:type stream: List of ZNE sorted numpy arrays
:param noise_tresh: Variance of the noise sphere; data points are excluded
when falling within the sphere of radius sqrt(noise_thres),
default is set to 0.
:type noise_thres: float
:returns:  azimuth, incidence, rectilinearity, and planarity
"""
mask = (stream[0][:] ** 2 + stream[1][:] ** 2 + stream[2][:] ** 2
) > noise_thres
# East
# North
# Z

covmat = np.cov(x)
eigvec, eigenval, v = np.linalg.svd(covmat)
# Rectilinearity defined after Montalbetti & Kanasewich, 1970
rect = 1.0 - np.sqrt(eigenval[1] / eigenval[0])
# Planarity defined after [Jurkevics1988]_
plan = 1.0 - (2.0 * eigenval[2] / (eigenval[1] + eigenval[0]))
azimuth = math.degrees(math.atan2(eigvec[0][0], eigvec[1][0]))
eve = np.sqrt(eigvec[0][0] ** 2 + eigvec[1][0] ** 2)
incidence = math.degrees(math.atan2(eve, eigvec[2][0]))
if azimuth < 0.0:
azimuth = 360.0 + azimuth
if incidence < 0.0:
incidence += 180.0
if incidence > 90.0:
incidence = 180.0 - incidence
if azimuth > 180.0:
azimuth -= 180.0
else:
azimuth += 180.0
if azimuth > 180.0:
azimuth -= 180.0

return azimuth, incidence, rect, plan

[docs]def instantaneous_frequency(data, sampling_rate):
"""
Simple function to estimate the instantaneous frequency based on the
derivative of the data and the analytical (hilbert) data.

:param data: The data array.
:type data: :class:numpy.ndarray
:param sampling_rate: The sampling rate in Hz.
:type sampling_rate: float
"""
x = signal.hilbert(data)

instf = (x.real * dx.imag - x.imag * dx.real) / \
(2 * math.pi * (abs(x) ** 2))

return instf

[docs]def vidale_adapt(stream, noise_thres, fs, flow, fhigh, spoint, stime, etime):
"""
Adaptive window polarization analysis after [Vidale1986]_ with the
modification of adapted analysis window estimated by estimating the
instantaneous frequency. It returns the azimuth, incidence, rectilinearity
planarity and ellipticity.

:param stream: ZNE containing trace data
:type stream: :class:~obspy.core.stream.Stream
:param noise_thres: Variance of the noise sphere; data points are excluded
when falling within the sphere of radius sqrt(noise_thres),
Default = 0
:type noise_thres: float
:param fs: sampling rate
:type fs: float
:param flow: lower frequency limit for analysis
:type flow: float
:param fhigh: upper frequency limit for analysis
:type fhigh: float
:param spoint: array with traces' individual start times in samples
:type spoint: :class:numpy.ndarray
:param stime: start time of the analysis
:type stime: :class:~obspy.core.utcdatetime.UTCDateTime
:param etime: end time for the analysis
:type etime: :class:~obspy.core.utcdatetime.UTCDateTime
:returns: list of tuples containing azimuth, incidence, rectilinearity,
planarity, and ellipticity
"""
w = 3.0
# sort for ZNE
stream.sort(reverse=True)
z = stream[0].data
n = stream[1].data
e = stream[2].data

zi = instantaneous_frequency(z, fs)
za = signal.hilbert(z)
ni = instantaneous_frequency(n, fs)
na = signal.hilbert(n)
ei = instantaneous_frequency(e, fs)
ea = signal.hilbert(e)
res = []

offset = int(3 * fs / flow)
covmat = np.zeros([3, 3], dtype=np.complex128)
while True:
adapt = int(3. * w * fs / (zi[offset] + ni[offset] + ei[offset]))
# in order to account for errors in the inst freq estimation
if adapt > int(3 * fs / flow):
adapt = int(3 * fs / flow)
if adapt < int(3 * fs / fhigh):
adapt = int(3 * fs / fhigh)
# XXX: was adapt /= 2
newstart = stime + offset / fs
if (newstart + (adapt / 2) / fs) > etime:
break

zx = za[int(spoint[2] + offset - adapt / 2):
int(spoint[2] + offset + adapt / 2)]
nx = na[int(spoint[1] + offset - adapt / 2):
int(spoint[1] + offset + adapt / 2)]
ex = ea[int(spoint[0] + offset - adapt / 2):
int(spoint[0] + offset + adapt / 2)]
zx -= zx.mean()
nx -= nx.mean()
ex -= ex.mean()

covmat[0][0] = np.dot(ex, ex.conjugate())
covmat[0][1] = np.dot(ex, nx.conjugate())
covmat[1][0] = covmat[0][1].conjugate()
covmat[0][2] = np.dot(ex, zx.conjugate())
covmat[2][0] = covmat[0][2].conjugate()
covmat[1][1] = np.dot(nx, nx.conjugate())
covmat[1][2] = np.dot(zx, nx.conjugate())
covmat[2][1] = covmat[1][2].conjugate()
covmat[2][2] = np.dot(zx, zx.conjugate())

eigvec, eigenval, v = np.linalg.svd(covmat)

# very similar to function flinn, possible could be unified
def fun(x):
return 1. - math.sqrt(
((eigvec[0][0] * (math.cos(x) + math.sin(x) * 1j)).real) ** 2 +
((eigvec[1][0] * (math.cos(x) + math.sin(x) * 1j)).real) ** 2 +
((eigvec[2][0] * (math.cos(x) + math.sin(x) * 1j)).real) ** 2)

final = fminbound(fun, 0.0, math.pi, full_output=True)
x = 1. - final[1]
ellip = math.sqrt(1.0 - x ** 2) / x
# rectilinearity defined after Montalbetti & Kanasewich, 1970
rect = 1. - np.sqrt(eigenval[1] / eigenval[0])
# planarity defined after [Jurkevics1988]_
plan = 1. - (2.0 * eigenval[2] / (eigenval[1] + eigenval[0]))

azimuth = 180 * math.atan2(eigvec[0][0].real, eigvec[1][0].real) / \
math.pi
eve = np.sqrt(eigvec[0][0].real ** 2 + eigvec[1][0].real ** 2)

incidence = 180 * math.atan2(eve, eigvec[2][0].real) / math.pi
if azimuth < 0.0:
azimuth = 360.0 + azimuth
if incidence < 0.0:
incidence += 180.0
if incidence > 90.0:
incidence = 180.0 - incidence
if azimuth > 180.0:
azimuth -= 180.0
else:
azimuth += 180.0
if azimuth > 180.0:
azimuth -= 180.0

res.append((newstart.timestamp, azimuth, incidence, rect, plan, ellip))
offset += 1

return res

[docs]def particle_motion_odr(stream, noise_thres=0):
"""
Computes the orientation of the particle motion vector based on an
orthogonal regression algorithm.

:param stream: ZNE sorted trace data
:type stream: :class:~obspy.core.stream.Stream
:param noise_tres: variance of the noise sphere; data points are excluded
when falling within the sphere of radius sqrt(noise_thres)
:type noise_thres: float
:returns: azimuth, incidence, error of azimuth, error of incidence
"""
z = []
n = []
e = []
comp, npts = np.shape(stream)

for i in range(0, npts):
if (stream[0][i] ** 2 + stream[1][i] ** 2 + stream[2][i] ** 2) \
> noise_thres:
z.append(stream[0][i])
n.append(stream[1][i])
e.append(stream[2][i])

def fit_func(beta, x):
# XXX: Eventually this is correct: return beta[0] * x + beta[1]
return beta[0] * x

data = scipy.odr.Data(e, n)
model = scipy.odr.Model(fit_func)
odr = scipy.odr.ODR(data, model, beta0=[1.])
out = odr.run()
az_slope = out.beta[0]
az_error = out.sd_beta[0]

n = np.asarray(n)
e = np.asarray(e)
z = np.asarray(z)
r = np.sqrt(n ** 2 + e ** 2)

data = scipy.odr.Data(r, abs(z))
model = scipy.odr.Model(fit_func)
odr = scipy.odr.ODR(data, model, beta0=[1.0])
out = odr.run()
in_slope = out.beta[0]
in_error = out.sd_beta[0]

azimuth = math.atan2(1.0, az_slope)
incidence = math.atan2(1.0, in_slope)

az_error = 1.0 / ((1.0 ** 2 + az_slope ** 2) * azimuth) * az_error
# az_error = math.degrees(az_error)
in_error = 1.0 / ((1.0 ** 2 + in_slope ** 2) * incidence) * in_error
# in_error = math.degrees(in_error)

azimuth = math.degrees(azimuth)
incidence = math.degrees(incidence)

if azimuth < 0.0:
azimuth = 360.0 + azimuth
if incidence < 0.0:
incidence += 180.0
if incidence > 90.0:
incidence = 180.0 - incidence
if azimuth > 180.0:
azimuth -= 180.0
else:
azimuth += 180.0
if azimuth > 180.0:
azimuth -= 180.0

return azimuth, incidence, az_error, in_error

[docs]def _get_s_point(stream, stime, etime):
"""
Function for computing the trace dependent start time in samples

:param stime: time to start
:type stime: :class:~obspy.core.utcdatetime.UTCDateTime
:param etime: time to end
:type etime: :class:~obspy.core.utcdatetime.UTCDateTime
:returns: spoint, epoint
"""
slatest = stream[0].stats.starttime
eearliest = stream[0].stats.endtime
for tr in stream:
if tr.stats.starttime >= slatest:
slatest = tr.stats.starttime
if tr.stats.endtime <= eearliest:
eearliest = tr.stats.endtime

nostat = len(stream)
spoint = np.empty(nostat, dtype=np.int32)
epoint = np.empty(nostat, dtype=np.int32)
# now we have to adjust to the beginning of real start time
if slatest > stime:
msg = "Specified start time is before latest start time in stream"
raise ValueError(msg)
if eearliest < etime:
msg = "Specified end time is after earliest end time in stream"
raise ValueError(msg)
for i in range(nostat):
offset = int(((stime - slatest) / stream[i].stats.delta + 1.))
negoffset = int(((eearliest - etime) / stream[i].stats.delta + 1.))
diffstart = slatest - stream[i].stats.starttime
frac, _ = math.modf(diffstart)
spoint[i] = int(diffstart)
if frac > stream[i].stats.delta * 0.25:
msg = "Difference in start times exceeds 25% of sampling rate"
warnings.warn(msg)
spoint[i] += offset
diffend = stream[i].stats.endtime - eearliest
frac, _ = math.modf(diffend)
epoint[i] = int(diffend)
epoint[i] += negoffset

return spoint, epoint

[docs]def polarization_analysis(stream, win_len, win_frac, frqlow, frqhigh, stime,
etime, verbose=False, method="pm", var_noise=0.0):
"""
Method carrying out polarization analysis with the [Flinn1965b]_,
[Jurkevics1988]_, ParticleMotion, or [Vidale1986]_ algorithm.

:param stream: 3 component input data.
:type stream: :class:~obspy.core.stream.Stream
:param win_len: Sliding window length in seconds.
:type win_len: float
:param win_frac: Fraction of sliding window to use for step.
:type win_frac: float
:param var_noise: resembles a sphere of noise in PM where the 3C is
excluded
:type var_noise: float
:param frqlow: lower frequency for PM
:type frqlow: float
:param frqhigh: higher frequency for PM
:type frqhigh: float
:param stime: Start time of interest
:type stime: :class:obspy.core.utcdatetime.UTCDateTime
:param etime: End time of interest
:type etime: :class:obspy.core.utcdatetime.UTCDateTime
:param method: the method to use. one of "pm", "flinn" or
"vidale".
:type method: str
:rtype: dict
:returns: Dictionary with keys "timestamp" (POSIX timestamp, can be
used to initialize :class:~obspy.core.utcdatetime.UTCDateTime
objects), "azimuth", "incidence" (incidence angle) and
additional keys depending on used method: "azimuth_error" and
"incidence_error" (for method "pm"), "rectilinearity" and
"planarity" (for methods "flinn" and "vidale") and
"ellipticity" (for method "flinn"). Under each key a
:class:~numpy.ndarray is stored, giving the respective values
corresponding to the "timestamp" :class:~numpy.ndarray.
"""
if method.lower() not in ["pm", "flinn", "vidale"]:
msg = "Invalid method ('%s')" % method
raise ValueError(msg)

res = []

# check that sampling rates do not vary
fs = stream[0].stats.sampling_rate
if len(stream) != len(stream.select(sampling_rate=fs)):
msg = "sampling rates of traces in stream are not equal"
raise ValueError(msg)

if verbose:
print("stream contains following traces:")
print(stream)
print("stime = " + str(stime) + ", etime = " + str(etime))

spoint, _epoint = _get_s_point(stream, stime, etime)
if method.lower() == "vidale":
res = vidale_adapt(stream, var_noise, fs, frqlow, frqhigh, spoint,
stime, etime)
else:
nsamp = int(win_len * fs)
nstep = int(nsamp * win_frac)
newstart = stime
tap = cosine_taper(nsamp, p=0.22)
offset = 0
while (newstart + (nsamp + nstep) / fs) < etime:
try:
data = []
z = []
n = []
e = []
for i, tr in enumerate(stream):
dat = tr.data[spoint[i] + offset:
spoint[i] + offset + nsamp]
dat = (dat - dat.mean()) * tap
if "Z" in tr.stats.channel:
z = dat.copy()
if "N" in tr.stats.channel:
n = dat.copy()
if "E" in tr.stats.channel:
e = dat.copy()

data.append(z)
data.append(n)
data.append(e)
except IndexError:
break

# we plot against the centre of the sliding window
if method.lower() == "pm":
azimuth, incidence, error_az, error_inc = \
particle_motion_odr(data, var_noise)
res.append(np.array([newstart.timestamp + float(nstep) / fs,
azimuth, incidence, error_az, error_inc]))
if method.lower() == "flinn":
azimuth, incidence, reclin, plan = flinn(data, var_noise)
res.append(np.array([newstart.timestamp + float(nstep) / fs,
azimuth, incidence, reclin, plan]))

if verbose:
print(newstart, newstart + nsamp / fs, res[-1][1:])
offset += nstep

newstart += float(nstep) / fs

res = np.array(res)

result_dict = {"timestamp": res[:, 0],
"azimuth": res[:, 1],
"incidence": res[:, 2]}
if method.lower() == "pm":
result_dict["azimuth_error"] = res[:, 3]
result_dict["incidence_error"] = res[:, 4]
elif method.lower() == "vidale":
result_dict["rectilinearity"] = res[:, 3]
result_dict["planarity"] = res[:, 4]
result_dict["ellipticity"] = res[:, 5]
elif method.lower() == "flinn":
result_dict["rectilinearity"] = res[:, 3]
result_dict["planarity"] = res[:, 4]
return result_dict

if __name__ == "__main__":
import doctest
doctest.testmod(exclude_empty=True)