# -*- coding: utf-8 -*-
# ------------------------------------------------------------------
# Filename: calibration.py
# Purpose: Functions for relative calibration (e.g. Huddle test calibration)
# Author: Felix Bernauer, Simon Kremers
# Email: bernauer@geophysik.uni-muenchen.de
#
# Copyright (C) 2011 Felix Bernauer, Simon Kremers
# --------------------------------------------------------------------
"""
Functions for relative calibration.
:copyright:
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import numpy as np
from obspy.core.stream import Stream
from obspy.core.trace import Trace
from obspy.io.gse2.paz import read_paz
from obspy.signal.invsim import paz_to_freq_resp
from obspy.signal.konnoohmachismoothing import konno_ohmachi_smoothing
from obspy.signal.util import next_pow_2
[docs]
def rel_calib_stack(st1, st2, calib_file, window_len, overlap_frac=0.5,
smooth=0, save_data=True):
"""
Method for relative calibration of sensors using a sensor with known
transfer function
:param st1: Stream or Trace object, (known)
:param st2: Stream or Trace object, (unknown)
:type calib_file: str or dict
:param calib_file: file name of calibration file containing the PAZ of the
known instrument in GSE2 standard or a dictionary with poles and zeros
information (with keys ``'poles'``, ``'zeros'`` and ``'sensitivity'``).
:type window_len: float
:param window_len: length of sliding window in seconds
:type overlap_frac: float
:param overlap_frac: fraction of overlap, defaults to fifty percent (0.5)
:type smooth: float
:param smooth: variable that defines if the Konno-Ohmachi taper is used or
not. default = 0 -> no taper generally used in geopsy: smooth = 40
:type save_data: bool
:param save_data: Whether or not to save the result to a file. If True, two
output files will be created:
* The new response in station_name.window_length.resp
* The ref response in station_name.refResp
Defaults to True
:returns: frequency, amplitude and phase spectrum
implemented after rel_calib_stack.c by M.Ohrnberger and J.Wassermann.
"""
# transform given trace objects to streams
if isinstance(st1, Trace):
st1 = Stream([st1])
if isinstance(st2, Trace):
st2 = Stream([st2])
# check if sampling rate and trace length is the same
if st1[0].stats.npts != st2[0].stats.npts:
msg = "Traces don't have the same length!"
raise ValueError(msg)
elif st1[0].stats.sampling_rate != st2[0].stats.sampling_rate:
msg = "Traces don't have the same sampling rate!"
raise ValueError(msg)
else:
sampfreq = st1[0].stats.sampling_rate
# read waveforms
tr1 = st1[0].data.astype(np.float64)
tr2 = st2[0].data.astype(np.float64)
# get window length, nfft and frequency step
ndat = int(window_len * sampfreq)
nfft = next_pow_2(ndat)
# read calib file and calculate response function
gg, _freq = _calc_resp(calib_file, nfft, sampfreq)
# calculate number of windows and overlap
noverlap = nfft * overlap_frac
auto, _freq, _t = \
spectral_helper(tr1, tr1, NFFT=nfft, Fs=sampfreq, noverlap=noverlap)
cross, freq, _t = \
spectral_helper(tr2, tr1, NFFT=nfft, Fs=sampfreq, noverlap=noverlap)
# get number of windows that were actually computed inside FFT routine
nwin = auto.shape[1]
res = (cross / auto).sum(axis=1) * gg
# The first item might be zero. Problems with phase calculations.
res = res[1:]
freq = freq[1:]
gg = gg[1:]
res /= nwin
# apply Konno-Ohmachi smoothing taper if chosen
if smooth > 0:
# Write in one matrix for performance reasons.
spectra = np.empty((2, len(res.real)))
spectra[0] = res.real
spectra[1] = res.imag
new_spectra = \
konno_ohmachi_smoothing(spectra, freq, bandwidth=smooth, count=1,
max_memory_usage=1024, normalize=True)
res.real = new_spectra[0]
res.imag = new_spectra[1]
amp = np.abs(res)
# include phase unwrapping
phase = np.unwrap(np.angle(res)) # + 2.0 * np.pi
ra = np.abs(gg)
rpha = np.unwrap(np.angle(gg))
if save_data:
trans_new = (st2[0].stats.station + "." + st2[0].stats.channel +
"." + str(window_len) + ".resp")
trans_ref = st1[0].stats.station + ".refResp"
# Create empty array for easy saving
temp = np.empty((len(freq), 3))
temp[:, 0] = freq
temp[:, 1] = amp
temp[:, 2] = phase
np.savetxt(trans_new, temp, fmt='%.10f')
temp[:, 1] = ra
temp[:, 2] = rpha
np.savetxt(trans_ref, temp, fmt='%.10f')
return freq, amp, phase
[docs]
def _calc_resp(calfile, nfft, sampfreq):
"""
Calculate transfer function of known system.
:type calfile: str
:param calfile: file containing poles, zeros and scale factor for known
system or a dictionary with poles and zeros information (with keys
``'poles'``, ``'zeros'`` and ``'sensitivity'``).
:returns: complex transfer function, array of frequencies
"""
# test if calfile is a paz dict
if isinstance(calfile, dict):
paz = calfile
# or read paz file if a filename is specified
else:
paz = dict()
paz['poles'], paz['zeros'], paz['sensitivity'] = read_paz(calfile)
# calculate transfer function
h, f = paz_to_freq_resp(paz['poles'], paz['zeros'], paz['sensitivity'],
1.0 / sampfreq, nfft, freq=True)
return h, f
# A modified copy of the Matplotlib 0.99.1.1 method spectral_helper found in
# .../matlab/mlab.py.
# Some function were changed to avoid additional dependencies. Included here as
# it is essential for the above rel_calib_stack function and only present in
# recent matplotlib versions.
# This is a helper function that implements the commonality between the
# psd, csd, and spectrogram. It is *NOT* meant to be used outside of mlab
[docs]
def spectral_helper(x, y, NFFT=256, Fs=2, noverlap=0, pad_to=None, # noqa
sides='default', scale_by_freq=None):
# The checks for if y is x are so that we can use the same function to
# implement the core of psd(), csd(), and spectrogram() without doing
# extra calculations. We return the unaveraged Pxy, freqs, and t.
same_data = y is x
# Make sure we're dealing with a NumPy array. If y and x were the same
# object to start with, keep them that way
x = np.asarray(x)
if not same_data:
y = np.asarray(y)
# zero pad x and y up to NFFT if they are shorter than NFFT
if len(x) < NFFT:
n = len(x)
x = np.resize(x, (NFFT,))
x[n:] = 0
if not same_data and len(y) < NFFT:
n = len(y)
y = np.resize(y, (NFFT,))
y[n:] = 0
if pad_to is None:
pad_to = NFFT
if scale_by_freq is None:
scale_by_freq = True
# For real x, ignore the negative frequencies unless told otherwise
if (sides == 'default' and np.iscomplexobj(x)) or sides == 'twosided':
num_freqs = pad_to
scaling_factor = 1.
elif sides in ('default', 'onesided'):
num_freqs = pad_to // 2 + 1
scaling_factor = 2.
else:
raise ValueError("sides must be one of: 'default', 'onesided', or "
"'twosided'")
# Matlab divides by the sampling frequency so that density function
# has units of dB/Hz and can be integrated by the plotted frequency
# values. Perform the same scaling here.
if scale_by_freq:
scaling_factor /= Fs
window_vals = np.hanning(NFFT)
step = int(NFFT) - int(noverlap)
ind = np.arange(0, len(x) - NFFT + 1, step, dtype=np.int32)
n = len(ind)
p_xy = np.zeros((num_freqs, n), np.complex128)
# do the ffts of the slices
for i in range(n):
this_x = x[ind[i]:ind[i] + NFFT]
this_x = window_vals * this_x
fx = np.fft.fft(this_x, n=pad_to)
if same_data:
fy = fx
else:
th_is_y = y[ind[i]:ind[i] + NFFT]
th_is_y = window_vals * th_is_y
fy = np.fft.fft(th_is_y, n=pad_to)
p_xy[:, i] = np.conjugate(fx[:num_freqs]) * fy[:num_freqs]
# Scale the spectrum by the norm of the window to compensate for
# windowing loss; see Bendat & Piersol Sec 11.5.2. Also include
# scaling factors for one-sided densities and dividing by the sampling
# frequency, if desired.
p_xy *= scaling_factor / (np.abs(window_vals) ** 2).sum()
t = 1. / Fs * (ind + NFFT / 2.)
freqs = float(Fs) / pad_to * np.arange(num_freqs)
if (np.iscomplexobj(x) and sides == 'default') or sides == 'twosided':
# center the frequency range at zero
freqs = np.concatenate((freqs[num_freqs // 2:] - Fs,
freqs[:num_freqs // 2]))
p_xy = np.concatenate((p_xy[num_freqs // 2:, :],
p_xy[:num_freqs // 2, :]), 0)
return p_xy, freqs, t