# -*- coding: utf-8 -*-
"""
CMTSOLUTION file format support for ObsPy.
:copyright:
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import math
import uuid
import warnings
from obspy import UTCDateTime
from obspy.core.event import (Catalog, Comment, Event, EventDescription,
Origin, Magnitude, FocalMechanism, MomentTensor,
Tensor, SourceTimeFunction)
from obspy.geodetics import FlinnEngdahl
_fe = FlinnEngdahl()
[docs]def _get_resource_id(cmtname, res_type, tag=None):
"""
Helper function to create consistent resource ids.
"""
res_id = "smi:local/cmtsolution/%s/%s" % (cmtname, res_type)
if tag is not None:
res_id += "#" + tag
return res_id
[docs]def _buffer_proxy(filename_or_buf, function, reset_fp=True,
file_mode="rb", *args, **kwargs):
"""
Calls a function with an open file or file-like object as the first
argument. If the file originally was a filename, the file will be
opened, otherwise it will just be passed to the underlying function.
:param filename_or_buf: File to pass.
:type filename_or_buf: str, open file, or file-like object
:param function: The function to call.
:param reset_fp: If True, the file pointer will be set to the initial
position after the function has been called.
:type reset_fp: bool
:param file_mode: Mode to open file in if necessary.
"""
try:
position = filename_or_buf.tell()
is_buffer = True
except AttributeError:
is_buffer = False
if is_buffer is True:
ret_val = function(filename_or_buf, *args, **kwargs)
if reset_fp:
filename_or_buf.seek(position, 0)
return ret_val
else:
with open(filename_or_buf, file_mode) as fh:
return function(fh, *args, **kwargs)
[docs]def _is_cmtsolution(filename_or_buf):
"""
Checks if the file is a CMTSOLUTION file.
:param filename_or_buf: File to test.
:type filename_or_buf: str or file-like object
"""
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
return _buffer_proxy(filename_or_buf, _internal_is_cmtsolution,
reset_fp=True)
# Happens for example when passing the data as a string which would be
# interpreted as a filename.
except OSError:
return False
[docs]def _internal_is_cmtsolution(buf):
"""
Checks if the file is a CMTSOLUTION file.
:param buf: File to check.
:type buf: open file or file-like object
"""
# The file format is so simple. Just attempt to read the first event. If
# it passes it will be read again but that has really no
# significant performance impact.
try:
_internal_read_single_cmtsolution(buf)
return True
except Exception:
return False
[docs]def _read_cmtsolution(filename_or_buf, **kwargs):
"""
Reads a CMTSOLUTION file to a :class:`~obspy.core.event.Catalog` object.
:param filename_or_buf: File to read.
:type filename_or_buf: str or file-like object
"""
return _buffer_proxy(filename_or_buf, _internal_read_cmtsolution, **kwargs)
[docs]def _internal_read_cmtsolution(buf, **kwargs):
"""
Reads a CMTSOLUTION file to a :class:`~obspy.core.event.Catalog` object.
:param buf: File to read.
:type buf: open file or file-like object
"""
events = []
cur_pos = buf.tell()
# This also works with BytesIO and what not.
buf.seek(0, 2)
size = buf.tell()
buf.seek(cur_pos, 0)
# This is pretty inefficient due to all the file pointer jumping but
# performance is really the least of our concerns. Also most performance
# is still lost initializing the large ObsPy event objects.
while True:
if buf.tell() >= size:
break
line = buf.readline().strip()
# If there is something, jump back to the beginning of the line and
# read the next event.
if line:
buf.seek(cur_pos, 0)
events.append(_internal_read_single_cmtsolution(buf))
cur_pos = buf.tell()
return Catalog(resource_id=_get_resource_id("catalog", str(uuid.uuid4())),
events=events)
[docs]def _internal_read_single_cmtsolution(buf):
"""
Reads a single CMTSOLUTION file to a :class:`~obspy.core.event.Catalog`
object.
:param buf: File to read.
:type buf: open file or file-like object
"""
# The first line encodes the preliminary epicenter.
line = buf.readline()
hypocenter_catalog = line[:5].strip().decode()
origin_time = line[5:].strip().split()[:6]
values = list(map(int, origin_time[:-1])) + \
[float(origin_time[-1])]
try:
origin_time = UTCDateTime(*values)
except (TypeError, ValueError):
warnings.warn("Could not determine origin time from line: %s. Will "
"be set to zero." % line)
origin_time = UTCDateTime(0)
line = line[28:].split()
latitude, longitude, depth, body_wave_mag, surface_wave_mag = \
map(float, line[:5])
# The rest encodes the centroid solution.
event_name = buf.readline().strip().split()[-1].decode()
preliminary_origin = Origin(
resource_id=_get_resource_id(event_name, "origin", tag="prelim"),
time=origin_time,
longitude=longitude,
latitude=latitude,
# Depth is in meters.
depth=depth * 1000.0,
origin_type="hypocenter",
region=_fe.get_region(longitude=longitude, latitude=latitude),
evaluation_status="preliminary"
)
preliminary_bw_magnitude = Magnitude(
resource_id=_get_resource_id(event_name, "magnitude", tag="prelim_bw"),
mag=body_wave_mag, magnitude_type="Mb",
evaluation_status="preliminary",
origin_id=preliminary_origin.resource_id)
preliminary_sw_magnitude = Magnitude(
resource_id=_get_resource_id(event_name, "magnitude", tag="prelim_sw"),
mag=surface_wave_mag, magnitude_type="MS",
evaluation_status="preliminary",
origin_id=preliminary_origin.resource_id)
values = ["time_shift", "half_duration", "latitude", "longitude",
"depth", "m_rr", "m_tt", "m_pp", "m_rt", "m_rp", "m_tp"]
cmt_values = {_i: float(buf.readline().strip().split()[-1])
for _i in values}
# Moment magnitude calculation in dyne * cm.
m_0 = 1.0 / math.sqrt(2.0) * math.sqrt(
cmt_values["m_rr"] ** 2 +
cmt_values["m_tt"] ** 2 +
cmt_values["m_pp"] ** 2 +
2.0 * cmt_values["m_rt"] ** 2 +
2.0 * cmt_values["m_rp"] ** 2 +
2.0 * cmt_values["m_tp"] ** 2)
m_w = 2.0 / 3.0 * (math.log10(m_0) - 16.1)
# Convert to meters.
cmt_values["depth"] *= 1000.0
# Convert to Newton meter.
values = ["m_rr", "m_tt", "m_pp", "m_rt", "m_rp", "m_tp"]
for value in values:
cmt_values[value] /= 1E7
cmt_origin = Origin(
resource_id=_get_resource_id(event_name, "origin", tag="cmt"),
time=origin_time + cmt_values["time_shift"],
longitude=cmt_values["longitude"],
latitude=cmt_values["latitude"],
depth=cmt_values["depth"],
origin_type="centroid",
# Could rarely be different than the epicentral region.
region=_fe.get_region(longitude=cmt_values["longitude"],
latitude=cmt_values["latitude"])
# No evaluation status as it could be any of several and the file
# format does not provide that information.
)
cmt_mag = Magnitude(
resource_id=_get_resource_id(event_name, "magnitude", tag="mw"),
# Round to 2 digits.
mag=round(m_w, 2),
magnitude_type="mw",
origin_id=cmt_origin.resource_id
)
foc_mec = FocalMechanism(
resource_id=_get_resource_id(event_name, "focal_mechanism"),
# The preliminary origin most likely triggered the focal mechanism
# determination.
triggering_origin_id=preliminary_origin.resource_id
)
tensor = Tensor(
m_rr=cmt_values["m_rr"],
m_pp=cmt_values["m_pp"],
m_tt=cmt_values["m_tt"],
m_rt=cmt_values["m_rt"],
m_rp=cmt_values["m_rp"],
m_tp=cmt_values["m_tp"]
)
# Source time function is a triangle, according to the SPECFEM manual.
stf = SourceTimeFunction(
type="triangle",
# The duration is twice the half duration.
duration=2.0 * cmt_values["half_duration"]
)
mt = MomentTensor(
resource_id=_get_resource_id(event_name, "moment_tensor"),
derived_origin_id=cmt_origin.resource_id,
moment_magnitude_id=cmt_mag.resource_id,
# Convert to Nm.
scalar_moment=m_0 / 1E7,
tensor=tensor,
source_time_function=stf
)
# Assemble everything.
foc_mec.moment_tensor = mt
ev = Event(resource_id=_get_resource_id(event_name, "event"),
event_type="earthquake")
ev.event_descriptions.append(EventDescription(text=event_name,
type="earthquake name"))
ev.comments.append(Comment(
text="Hypocenter catalog: %s" % hypocenter_catalog,
force_resource_id=False))
ev.origins.append(cmt_origin)
ev.origins.append(preliminary_origin)
ev.magnitudes.append(cmt_mag)
ev.magnitudes.append(preliminary_bw_magnitude)
ev.magnitudes.append(preliminary_sw_magnitude)
ev.focal_mechanisms.append(foc_mec)
# Set the preferred items.
ev.preferred_origin_id = cmt_origin.resource_id.id
ev.preferred_magnitude_id = cmt_mag.resource_id.id
ev.preferred_focal_mechanism_id = foc_mec.resource_id.id
ev.scope_resource_ids()
return ev
[docs]def _write_cmtsolution(catalog, filename_or_buf, **kwargs):
"""
Write an event to a file.
:param catalog: The catalog to write. Can only contain one event.
:type catalog: :class:`~obspy.core.event.Catalog`
:param filename_or_buf: Filename or file-like object to write to.
:type filename_or_buf: str, open file, or file-like object
"""
return _buffer_proxy(filename_or_buf, _internal_write_cmtsolution,
file_mode="wb", catalog=catalog, **kwargs)
[docs]def _internal_write_cmtsolution(buf, catalog, **kwargs):
"""
Write events to a file.
:param buf: File to write to.
:type buf: open file or file-like object
:param catalog: The catalog to write.
:type catalog: :class:`~obspy.core.event.Catalog`
"""
# Some sanity checks.
if len(catalog) < 1:
raise ValueError("Catalog must contain at least one event")
for event in catalog:
_internal_write_single_cmtsolution(buf, event)
# Add an empty line between events.
if len(catalog) > 1:
buf.write(b"\n")
[docs]def _internal_write_single_cmtsolution(buf, event, **kwargs):
"""
Write an event to a file.
:param buf: File to write to.
:type buf: open file or file-like object
:param event: The event to write.
:type event: :class:`~obspy.core.event.event.Event`
"""
if not event.focal_mechanisms:
raise ValueError("Event must contain a focal mechanism.")
foc_mec = event.preferred_focal_mechanism() or event.focal_mechanisms[0]
if not foc_mec.moment_tensor:
raise ValueError("The preferred or first focal mechanism must "
"contain a moment tensor.")
mt = foc_mec.moment_tensor
if not mt.tensor:
raise ValueError("The preferred or first focal mechanism must "
"contain a moment tensor element with an actual "
"tensor.")
if not event.origins:
raise ValueError("Event must have at least one origin.")
if not event.magnitudes:
raise ValueError("Event must have at least one magnitude.")
# Attempt to get the body and surface wave magnitudes.
mb_candidates = \
[_i for _i in event.magnitudes if _i.magnitude_type == "Mb"]
ms_candidates = \
[_i for _i in event.magnitudes if _i.magnitude_type == "MS"]
if not mb_candidates:
warnings.warn("No body wave magnitude found. Will be replaced by the "
"first magnitude in the event object.")
mb_mag = event.magnitudes[0]
else:
mb_mag = mb_candidates[0]
if not ms_candidates:
warnings.warn("No surface wave magnitude found. Will be replaced by "
"the first magnitude in the event object.")
ms_mag = event.magnitudes[0]
else:
ms_mag = ms_candidates[0]
# Now find the cmt origin. First attempt to get the derived origin of
# the moment tensor,
if mt.derived_origin_id:
cmt_origin = mt.derived_origin_id.get_referred_object()
# Otherwise try to find the first one that is CMT
else:
candidates = [_i for _i in event.origins
if _i.origin_type == "centroid"]
if candidates:
warnings.warn("No derived origin attached to the moment tensor. "
"Will instead use another centroid origin to be "
"written to the file.")
cmt_origin = candidates[0]
# Otherwise just take the preferred or first one.
else:
warnings.warn("Could not find a centroid origin. Will instead "
"assume that the preferred or first origin is the "
"centroid origin.")
cmt_origin = event.preferred_origin() or event.origins[0]
# Next step is to find a hypocentral origin.
candidates = [_i for _i in event.origins
if _i.origin_type == "hypocenter"]
if candidates:
hypo_origin = candidates[0]
# Otherwise get the first one that is not equal to the CMT origin.
else:
if len(event.origins) == 1:
warnings.warn("Hypocentral origin will be identical to the "
"centroid one.")
hypo_origin = event.origins[0]
else:
warnings.warn("No hypocentral origin could be found. Will choose "
"the first one that is not identical to the "
"centroid origin.")
hypo_origin = [_i for _i in event.origins if _i != cmt_origin][0]
# Try to find the half duration.
if mt.source_time_function:
if mt.source_time_function.duration:
half_duration = mt.source_time_function.duration / 2.0
else:
warnings.warn("Source time function has no duration. The half "
"duration will be set to 1.0.")
half_duration = 1.0
else:
warnings.warn("Moment tensor has no source time function. Half "
"duration will be set to 1.0.")
half_duration = 1.0
# Now attempt to retrieve the event name. Otherwise just get a random one.
event_name = None
if event.event_descriptions:
candidates = [_i for _i in event.event_descriptions
if _i.type == "earthquake name"]
if candidates:
event_name = candidates[0].text
if event_name is None:
event_name = str(uuid.uuid4())[:6].upper()
# Also attempt to retrieve the determination type.
hypocenter_catalog = "PDE"
if event.comments:
candidates = [
_i for _i in event.comments
if _i.text.lower().strip().startswith("hypocenter catalog:")]
if candidates:
hypocenter_catalog = \
candidates[0].text.strip().split(":")[-1].upper()
template = (
"{hypocenter_catalog:>4} {year:4d} {month:02d} {day:02d} {hour:02d} "
"{minute:02d} {second:05.2f} "
"{latitude:9.4f} {longitude:9.4f} {depth:5.1f} {mb:.1f} {ms:.1f} "
"{region}\n"
"event name:{event_name:>17}\n"
"time shift:{time_shift:17.4f}\n"
"half duration:{half_duration:14.4f}\n"
"latitude:{cmt_latitude:19.4f}\n"
"longitude:{cmt_longitude:18.4f}\n"
"depth:{cmt_depth:22.4f}\n"
"Mrr:{m_rr:24.6E}\n"
"Mtt:{m_tt:24.6E}\n"
"Mpp:{m_pp:24.6E}\n"
"Mrt:{m_rt:24.6E}\n"
"Mrp:{m_rp:24.6E}\n"
"Mtp:{m_tp:24.6E}\n"
)
template = template.format(
hypocenter_catalog=hypocenter_catalog,
year=hypo_origin.time.year,
month=hypo_origin.time.month,
day=hypo_origin.time.day,
hour=hypo_origin.time.hour,
minute=hypo_origin.time.minute,
second=float(hypo_origin.time.second) +
hypo_origin.time.microsecond / 1E6,
latitude=hypo_origin.latitude,
longitude=hypo_origin.longitude,
depth=hypo_origin.depth / 1000.0,
mb=mb_mag.mag,
ms=ms_mag.mag,
region=_fe.get_region(longitude=hypo_origin.longitude,
latitude=hypo_origin.latitude),
event_name=event_name,
time_shift=cmt_origin.time - hypo_origin.time,
half_duration=half_duration,
cmt_latitude=cmt_origin.latitude,
cmt_longitude=cmt_origin.longitude,
cmt_depth=cmt_origin.depth / 1000.0,
# Convert to dyne * cm.
m_rr=mt.tensor.m_rr * 1E7,
m_tt=mt.tensor.m_tt * 1E7,
m_pp=mt.tensor.m_pp * 1E7,
m_rt=mt.tensor.m_rt * 1E7,
m_rp=mt.tensor.m_rp * 1E7,
m_tp=mt.tensor.m_tp * 1E7
)
# Write to a buffer/file opened in binary mode.
buf.write(template.encode())