Source code for obspy.io.scardec.core

# -*- coding: utf-8 -*-
"""
SCARDEC file format support for ObsPy.
These files contain source solutions calculated at IPGP using the
SCARDEC method [Vallee2011]_. These solutions contain a focal mechanism
and a source time function in form of a regularly sampled moment rate.
An equivalent moment tensor is automatically calculated. Writing of SCARDEC
files is only possible, if the event contains a moment rate function.

:copyright:
    The ObsPy Development Team (devs@obspy.org)
:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
import uuid
import warnings

import numpy as np

from obspy import UTCDateTime
from obspy.core.event import (Catalog, Comment, Event, EventDescription,
                              Origin, Magnitude, FocalMechanism, MomentTensor,
                              Tensor, SourceTimeFunction, NodalPlane,
                              NodalPlanes)
from obspy.geodetics import FlinnEngdahl


_fe = FlinnEngdahl()


[docs] def _get_resource_id(scardecname, res_type, tag=None): """ Helper function to create consistent resource ids. """ res_id = "smi:local/scardec/%s/%s" % (scardecname, res_type) if tag is not None: res_id += "#" + tag return res_id
[docs] def _buffer_proxy(filename_or_buf, function, reset_fp=True, file_mode="rb", *args, **kwargs): """ Calls a function with an open file or file-like object as the first argument. If the file originally was a filename, the file will be opened, otherwise it will just be passed to the underlying function. :param filename_or_buf: File to pass. :type filename_or_buf: str, open file, or file-like object :param function: The function to call. :param reset_fp: If True, the file pointer will be set to the initial position after the function has been called. :type reset_fp: bool :param file_mode: Mode to open file in if necessary. """ try: position = filename_or_buf.tell() is_buffer = True except AttributeError: is_buffer = False if is_buffer is True: ret_val = function(filename_or_buf, *args, **kwargs) if reset_fp: filename_or_buf.seek(position, 0) return ret_val else: with open(filename_or_buf, file_mode) as fh: return function(fh, *args, **kwargs)
[docs] def _is_scardec(filename_or_buf): """ Checks if the file is a SCARDEC file. :param filename_or_buf: File to test. :type filename_or_buf: str or file-like object """ try: return _buffer_proxy(filename_or_buf, _internal_is_scardec, reset_fp=True) # Happens for example when passing the data as a string which would be # interpreted as a filename. except OSError: return False
[docs] def _internal_is_scardec(buf): """ Checks if the file is a SCARDEC file. :param buf: File to check. :type buf: open file or file-like object """ # The file format is so simple. Just attempt to read the first event. If # it passes it will be read again but that has really no # significant performance impact. try: _internal_read_single_scardec(buf) return True except Exception: return False
[docs] def _read_scardec(filename_or_buf, **kwargs): """ Reads a SCARDEC file to a :class:`~obspy.core.event.Catalog` object. :param filename_or_buf: File to read. :type filename_or_buf: str or file-like object """ return _buffer_proxy(filename_or_buf, _internal_read_scardec, **kwargs)
[docs] def _internal_read_scardec(buf, **kwargs): # @UnusedVariable """ Reads a SCARDEC file to a :class:`~obspy.core.event.Catalog` object. :param buf: File to read. :type buf: open file or file-like object """ events = [] cur_pos = buf.tell() # This also works with BytesIO and what not. buf.seek(0, 2) size = buf.tell() buf.seek(cur_pos, 0) # This is pretty inefficient due to all the file pointer jumping but # performance is really the least of our concerns. Also most performance # is still lost initializing the large ObsPy event objects. while True: if buf.tell() >= size: break line = buf.readline().strip() # If there is something, jump back to the beginning of the line and # read the next event. if line: buf.seek(cur_pos, 0) events.append(_internal_read_single_scardec(buf)) cur_pos = buf.tell() return Catalog(resource_id=_get_resource_id("catalog", str(uuid.uuid4())), events=events)
[docs] def _internal_read_single_scardec(buf): """ Reads a single SCARDEC file to a :class:`~obspy.core.event.Catalog` object. :param buf: File to read. :type buf: open file or file-like object """ # The first line encodes the origin time and epicenter line = buf.readline() origin_time = line.strip().split()[:6] values = list(map(int, origin_time[:-1])) + \ [float(origin_time[-1])] try: origin_time = UTCDateTime(*values) except (TypeError, ValueError): warnings.warn("Could not determine origin time from line: %s. Will " "be set to zero." % line) origin_time = UTCDateTime(0) line = line.split()[6:] latitude, longitude = map(float, line[:2]) # The second line encodes depth and the two focal mechanisms line = buf.readline() line = line.split() # First three values are depth, scalar moment (in Nm) and moment magnitude depth, scalar_moment, moment_mag = map(float, line[0:3]) # depth is in km in SCARDEC files depth *= 1e3 # Next six values are strike, dip, rake for both planes strike1, dip1, rake1 = map(float, line[3:6]) strike2, dip2, rake2 = map(float, line[6:9]) # The rest of the file is the moment rate function # In each line: time (sec), moment rate (Nm/sec) stf_time = [] stf_mr = [] for line in buf: stf_time.append(float(line.split()[0])) stf_mr.append(float(line.split()[1])) # Normalize the source time function stf_mr = np.array(stf_mr) stf_mr /= scalar_moment # Calculate the time step dt = np.mean(np.diff(stf_time)) # Calculate the stf offset (time of first sample wrt to origin time) offset = stf_time[0] # event name is set to generic value for now event_name = 'SCARDEC_event' cmt_origin = Origin( resource_id=_get_resource_id(event_name, "origin", tag="cmt"), time=origin_time, longitude=longitude, latitude=latitude, depth=depth, origin_type="centroid", region=_fe.get_region(longitude=longitude, latitude=latitude) ) cmt_mag = Magnitude( resource_id=_get_resource_id(event_name, "magnitude", tag="mw"), mag=moment_mag, magnitude_type="mw", origin_id=cmt_origin.resource_id ) nod1 = NodalPlane(strike=strike1, dip=dip1, rake=rake1) nod2 = NodalPlane(strike=strike2, dip=dip2, rake=rake2) nod = NodalPlanes(nodal_plane_1=nod1, nodal_plane_2=nod2) foc_mec = FocalMechanism( resource_id=_get_resource_id(event_name, "focal_mechanism"), nodal_planes=nod ) dip1 *= np.pi / 180. rake1 *= np.pi / 180. strike1 *= np.pi / 180. mxx = - scalar_moment * ((np.sin(dip1) * np.cos(rake1) * np.sin(2 * strike1)) + (np.sin(2 * dip1) * np.sin(rake1) * np.sin(2 * strike1))) mxy = scalar_moment * ((np.sin(dip1) * np.cos(rake1) * np.cos(2 * strike1)) + (np.sin(2 * dip1) * np.sin(rake1) * np.sin(2 * strike1) * 0.5)) myy = scalar_moment * ((np.sin(dip1) * np.cos(rake1) * np.sin(2 * strike1)) - (np.sin(2 * dip1) * np.sin(rake1) * np.cos(2 * strike1))) mxz = - scalar_moment * ((np.cos(dip1) * np.cos(rake1) * np.cos(strike1)) + (np.cos(2 * dip1) * np.sin(rake1) * np.sin(strike1))) myz = - scalar_moment * ((np.cos(dip1) * np.cos(rake1) * np.sin(strike1)) - (np.cos(2 * dip1) * np.sin(rake1) * np.cos(strike1))) mzz = scalar_moment * (np.sin(2 * dip1) * np.sin(rake1)) tensor = Tensor(m_rr=mxx, m_tt=myy, m_pp=mzz, m_rt=mxy, m_rp=mxz, m_tp=myz) cm = [Comment(text="Basis system: North,East,Down \ (Jost and Herrmann 1989)")] cm[0].resource_id = _get_resource_id(event_name, 'comment', 'mt') cm.append(Comment(text="MT derived from focal mechanism, therefore \ constrained to pure double couple.", force_resource_id=False)) # Write moment rate function extra = {'moment_rate': {'value': stf_mr, 'namespace': r"http://test.org/xmlns/0.1"}, 'dt': {'value': dt, 'namespace': r"http://test.org/xmlns/0.1"}, 'offset': {'value': offset, 'namespace': r"http://test.org/xmlns/0.1"} } # Source time function stf = SourceTimeFunction(type="unknown") stf.extra = extra mt = MomentTensor( resource_id=_get_resource_id(event_name, "moment_tensor"), derived_origin_id=cmt_origin.resource_id, moment_magnitude_id=cmt_mag.resource_id, scalar_moment=scalar_moment, tensor=tensor, source_time_function=stf, comments=cm ) # Assemble everything. foc_mec.moment_tensor = mt ev = Event(resource_id=_get_resource_id(event_name, "event"), event_type="earthquake") ev.event_descriptions.append(EventDescription(text=event_name, type="earthquake name")) ev.comments.append(Comment( text="Hypocenter catalog: SCARDEC", force_resource_id=False)) ev.origins.append(cmt_origin) ev.magnitudes.append(cmt_mag) ev.focal_mechanisms.append(foc_mec) # Set the preferred items. ev.preferred_origin_id = cmt_origin.resource_id.id ev.preferred_magnitude_id = cmt_mag.resource_id.id ev.preferred_focal_mechanism_id = foc_mec.resource_id.id ev.scope_resource_ids() return ev
[docs] def _write_scardec(catalog, filename_or_buf, **kwargs): """ Write an event to a file. :param catalog: The catalog to write. Can only contain one event. :type catalog: :class:`~obspy.core.event.Catalog` :param filename_or_buf: Filename or file-like object to write to. :type filename_or_buf: str, open file, or file-like object """ return _buffer_proxy(filename_or_buf, _internal_write_scardec, file_mode="wb", catalog=catalog, **kwargs)
[docs] def _internal_write_scardec(buf, catalog, **kwargs): # @UnusedVariable """ Write events to a file. :param buf: File to write to. :type buf: open file or file-like object :param catalog: The catalog to write. :type catalog: :class:`~obspy.core.event.Catalog` """ # Some sanity checks. if len(catalog) < 1: raise ValueError("Catalog must contain at least one event") for event in catalog: _internal_write_single_scardec(buf, event) # Add an empty line between events. if len(catalog) > 1: buf.write(b"\n")
[docs] def _internal_write_single_scardec(buf, event, **kwargs): # @UnusedVariable """ Write an event to a file. :param buf: File to write to. :type buf: open file or file-like object :param event: The event to write. :type event: :class:`~obspy.core.event.event.Event` """ if not event.focal_mechanisms: raise ValueError("Event must contain a focal mechanism.") foc_mec = event.preferred_focal_mechanism() or event.focal_mechanisms[0] if not event.origins: raise ValueError("Event must have at least one origin.") if not event.magnitudes: raise ValueError("Event must have at least one magnitude.") # Attempt to get a moment magnitude mw_candidates = \ [_i for _i in event.magnitudes if _i.magnitude_type == "Mw"] if not mw_candidates: warnings.warn("No moment wave magnitude found. Will be replaced by the" " first magnitude in the event object.") mw_mag = event.magnitudes[0] else: mw_mag = mw_candidates[0] # Now find the cmt origin. Try to find the first one that is CMT candidates = [_i for _i in event.origins if _i.origin_type == "centroid"] if candidates: warnings.warn("No derived origin attached to the moment tensor. " "Will instead use another centroid origin to be " "written to the file.") cmt_origin = candidates[0] # Otherwise just take the preferred or first one. else: warnings.warn("Could not find a centroid origin. Will instead " "assume that the preferred or first origin is the " "centroid origin.") cmt_origin = event.preferred_origin() or event.origins[0] if "extra" not in foc_mec.moment_tensor.source_time_function: raise ValueError('Event moment tensor must contain a source time \ function to be written in SCARDEC format') # Now attempt to retrieve the event name. Otherwise just get a random one. event_name = None if event.event_descriptions: candidates = [_i for _i in event.event_descriptions if _i.type == "earthquake name"] if candidates: event_name = candidates[0].text if event_name is None: event_name = str(uuid.uuid4())[:6].upper() template = ( "{year:4d} {month:02d} {day:02d} {hour:02d} " "{minute:02d} {second:04.1f} " "{latitude:9.4f} {longitude:9.4f}\n" "{depth:5.1f} {scalmom:9.3E} {mw:5.3f}" "{strike1:4d} {dip1:4d} {rake1:4d}" "{strike2:4d} {dip2:4d} {rake2:4d}\n" ) np1 = foc_mec.nodal_planes.nodal_plane_1 np2 = foc_mec.nodal_planes.nodal_plane_2 template = template.format( year=cmt_origin.time.year, month=cmt_origin.time.month, day=cmt_origin.time.day, hour=cmt_origin.time.hour, minute=cmt_origin.time.minute, second=float(cmt_origin.time.second) + cmt_origin.time.microsecond / 1E6, latitude=cmt_origin.latitude, longitude=cmt_origin.longitude, depth=cmt_origin.depth / 1000.0, scalmom=foc_mec.moment_tensor.scalar_moment, mw=mw_mag.mag, strike1=int(np1.strike), dip1=int(np1.dip), rake1=int(np1.rake), strike2=int(np2.strike), dip2=int(np2.dip), rake2=int(np2.rake) ) buf.write(template.encode('ascii', 'strict')) # Write to a buffer/file opened in binary mode. stf = foc_mec.moment_tensor.source_time_function.extra t_offset = stf['offset']['value'] scalmom = foc_mec.moment_tensor.scalar_moment nsamples = len(stf['moment_rate']['value']) times = np.arange(0, nsamples) * stf['dt']['value'] + t_offset samples = stf['moment_rate']['value'] * scalmom np.savetxt(buf, np.asarray([times, samples]).T, fmt=' %16.9E %16.9E'.encode('ascii', 'strict'))