Source code for obspy.io.hypodd.pha

# -*- coding: utf-8 -*-
"""
HypoDD PHA read support.

:copyright:
    The ObsPy Development Team (devs@obspy.org)
:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
import io
from math import cos
from numpy import deg2rad
from warnings import warn

from obspy import UTCDateTime
from obspy.core.event import (
    Catalog, Event, Origin, Magnitude, Pick, WaveformStreamID, Arrival,
    OriginQuality)
from obspy.core.inventory.util import (
    _add_resolve_seedid_doc, _add_resolve_seedid_ph2comp_doc, _resolve_seedid)


DEG2KM = 111.2


[docs]def _block2event(block, eventid_map, **kwargs): """ Read HypoDD event block """ lines = block.strip().splitlines() yr, mo, dy, hr, mn, sc, la, lo, dp, mg, eh, ez, rms, id_ = lines[0].split() if eventid_map is not None and id_ in eventid_map: id_ = eventid_map[id_] time = UTCDateTime(int(yr), int(mo), int(dy), int(hr), int(mn), float(sc), strict=False) laterr = None if float(eh) == 0 else float(eh) / DEG2KM lonerr = (None if laterr is None or float(la) > 89 else laterr / cos(deg2rad(float(la)))) ez = None if float(ez) == 0 else float(ez) * 1000 rms = None if float(rms) == 0 else float(rms) picks = [] arrivals = [] for line in lines[1:]: sta, reltime, weight, phase = line.split() widargs = _resolve_seedid(sta, '', time=time, phase=phase, **kwargs) wid = WaveformStreamID(*widargs) pick = Pick(waveform_id=wid, phase_hint=phase, time=time + float(reltime)) arrival = Arrival(phase=phase, pick_id=pick.resource_id, time_weight=float(weight)) picks.append(pick) arrivals.append(arrival) qu = OriginQuality(associated_phase_count=len(picks), standard_error=rms) origin = Origin(arrivals=arrivals, resource_id="smi:local/origin/" + id_, quality=qu, latitude=float(la), longitude=float(lo), depth=1000 * float(dp), latitude_errors=laterr, longitude_errors=lonerr, depth_errors=ez, time=time) if mg.lower() == 'nan': magnitudes = [] preferred_magnitude_id = None else: magnitude = Magnitude(mag=mg, resource_id="smi:local/magnitude/" + id_) magnitudes = [magnitude] preferred_magnitude_id = magnitude.resource_id event = Event(resource_id="smi:local/event/" + id_, picks=picks, origins=[origin], magnitudes=magnitudes, preferred_origin_id=origin.resource_id, preferred_magnitude_id=preferred_magnitude_id) return event
[docs]def _is_pha(filename): try: with open(filename, 'rb') as f: line = f.readline() assert line.startswith(b'#') assert len(line.split()) == 15 yr, mo, dy, hr, mn, sc = line.split()[1:7] UTCDateTime(int(yr), int(mo), int(dy), int(hr), int(mn), float(sc), strict=False) except Exception: return False else: return True
[docs]@_add_resolve_seedid_ph2comp_doc @_add_resolve_seedid_doc def _read_pha(filename, eventid_map=None, encoding='utf-8', **kwargs): """ Read a HypoDD PHA file and returns an ObsPy Catalog object. .. warning:: This function should NOT be called directly, it registers via the ObsPy :func:`~obspy.core.event.read_events` function, call this instead. :param str filename: File or file-like object in text mode. :param dict ph2comp: mapping of phases to components (default: {'P': 'Z', 'S': 'N'}) :param dict eventid_map: Desired mapping of hypodd event ids (dict values) to event resource ids (dict keys). The returned dictionary of the HYPODDPHA writing operation can be used. By default, ids are not mapped. :param str encoding: encoding used (default: utf-8) :rtype: :class:`~obspy.core.event.Catalog` :return: An ObsPy Catalog object. """ if eventid_map is not None: eventid_map = {v: k for k, v in eventid_map.items()} with io.open(filename, 'r', encoding=encoding) as f: text = f.read() events = [_block2event(block, eventid_map, **kwargs) for block in text.split('#')[1:]] return Catalog(events)
PHA1 = ('# {o.time.year} {o.time.month:>2} {o.time.day:>2} {o.time.hour:>2}' ' {o.time.minute:>2} {o.time.second:>2}.{o.time.microsecond:06d} ' '{o.latitude} {o.longitude} {depth} {mag} {he} {ve} {rms}' ' {evid:>9}\n') PHA2 = '{p.waveform_id.station_code:6} {relt:.4f} {weight} {p.phase_hint}\n'
[docs]def _map_eventid(evid, eventid_map, used_ids, counter): idpha = evid if evid in eventid_map: idpha = eventid_map[evid] if not idpha.isdigit() or len(idpha) > 9: msg = ('Invalid value in eventid_map, pha event id has to be ' 'digit with max 9 digits') raise ValueError(msg) return idpha if not idpha.isdigit(): idpha = ''.join(char for char in idpha if char.isdigit()) if len(idpha) > 9: idpha = idpha[:9] while idpha == '' or idpha in used_ids: idpha = str(counter[0]) counter[0] += 1 if idpha != evid: eventid_map[evid] = idpha used_ids.add(idpha) return idpha
[docs]def _write_pha(catalog, filename, eventid_map=None, **kwargs): # @UnusedVariable """ Write a HypoDD PHA file. .. warning:: This function should NOT be called directly, it registers via the the :meth:`~obspy.core.event.Catalog.write` method of an ObsPy :class:`~obspy.core.event.Catalog` object, call this instead. :type catalog: :class:`~obspy.core.event.catalog.Catalog` :param catalog: The ObsPy Catalog object to write. :type filename: str or file-like object :param filename: Filename to write or open file-like object. :param dict eventid_map: Desired mapping of event resource ids (dict keys) to hypodd event ids (dict values). HYPODD expects integer event ids with maximal 9 digits. If the event resource id is not present in the mapping, the event resource id is stripped of all non-digit characters and truncated to a length of 9 chars. If this method does not generate a valid hypodd event id, a counter starting at 1000 is used. :returns: Dictionary eventid_map with mapping of event resource id to hypodd event id. Items are only present if both ids are different. """ if len(catalog) >= 10**10: warn('Writing a very large catalog will use event ids that might not ' 'be readable by HypoDD.') lines = [] if eventid_map is None: eventid_map = {} args_map_eventid = (eventid_map, set(eventid_map.values()), [1]) for event in catalog: try: ori = event.preferred_origin() or event.origins[0] except IndexError: warn(f'Skipping writing event with missing origin: {event}') continue try: mag = event.preferred_magnitude() or event.magnitudes[0] except IndexError: warn('Missing magnitude will be set to 0.0') mag = 0. else: mag = mag.mag evid = event.resource_id.id evid = evid.split('/')[-1] if '/' in evid else evid evid = _map_eventid(evid, *args_map_eventid) rms = (ori.quality.standard_error if 'quality' in ori and ori.quality else None) rms = rms if rms is not None else 0.0 he1 = ori.latitude_errors.uncertainty if ori.latitude_errors else None he2 = (ori.longitude_errors.uncertainty if ori.longitude_errors else None) shortening = cos(deg2rad(ori.latitude)) he = max(0. if he1 is None else he1 * DEG2KM, 0. if he2 is None else he2 * DEG2KM * shortening) ve = ori.depth_errors.uncertainty if ori.depth_errors else None ve = 0. if ve is None else ve / 1000 line = PHA1.format(o=ori, depth=ori.depth / 1000, mag=mag, he=he, ve=ve, rms=rms, evid=evid) lines.append(line) weights = {str(arrival.pick_id): arrival.time_weight for arrival in ori.arrivals if arrival.time_weight} for pick in event.picks: weight = weights.get(str(pick.resource_id), 1.) line = PHA2.format(p=pick, relt=pick.time - ori.time, weight=weight) lines.append(line) data = ''.join(lines) try: with open(filename, 'w') as fh: fh.write(data) except TypeError: filename.write(data) return None if len(eventid_map) == 0 else eventid_map
if __name__ == '__main__': import doctest doctest.testmod(exclude_empty=True)