Source code for obspy.io.zmap.core

# -*- coding: utf-8 -*-
"""
ZMAP read/write support.

ZMAP is a simple 10 column CSV file (technically TSV) format for basic catalog
data (see [Wiemer2001]_).

:copyright:
    The ObsPy Development Team (devs@obspy.org), Lukas Heiniger
:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
from future.builtins import *  # NOQA

import math

from obspy.core import UTCDateTime
from obspy.core.event import (Catalog, Event, Magnitude, Origin,
                              OriginUncertainty)
from obspy.core.util.decorator import map_example_filename


_STD_ZMAP_COLUMNS = ('lon', 'lat', 'year', 'month', 'day', 'mag', 'depth',
                     'hour', 'minute', 'second')
_EXT_ZMAP_COLUMNS = ('h_err', 'z_err', 'm_err')


[docs]class Pickler(object):
[docs] def __init__(self, with_uncertainties=False): # This is ZMAP column order, don't change it zmap_columns = _STD_ZMAP_COLUMNS if with_uncertainties: zmap_columns += _EXT_ZMAP_COLUMNS self.zmap_columns = zmap_columns
[docs] def dump(self, catalog, filename): """ Writes ObsPy Catalog into given file. :type catalog: :class:`~obspy.core.event.Catalog` :param catalog: ObsPy Catalog object. :type filename: str or file :param filename: Target file name or open file-like object. """ # Open filehandler or use an existing file like object. if not hasattr(filename, "write"): file_opened = True fh = open(filename, "wb") else: file_opened = False fh = filename try: cat_string = self._serialize(catalog) fh.write(cat_string.encode('utf-8')) finally: if file_opened: fh.close()
[docs] def dumps(self, catalog): """ Returns ZMAP string of given ObsPy Catalog object. :type catalog: :class:`~obspy.core.event.Catalog` :param catalog: ObsPy Catalog object. :rtype: str :returns: ZMAP formatted string. """ return self._serialize(catalog)
@staticmethod
[docs] def _hz_error(origin): """ Compute horizontal error of origin. If the origin has an associated origin uncertainty object, we will try to extract the horizontal uncertainty from there. Otherwise we compute it from the individual lat/lon uncertainties stored in origin. """ ou = origin.origin_uncertainty # TODO: implement the other two possible descriptions if ou and ou.preferred_description == 'horizontal uncertainty': return ou.horizontal_uncertainty else: km_per_deg = math.pi * 6371.0087714 / 180.0 lat_err = origin.latitude_errors.uncertainty lon_err = origin.longitude_errors.uncertainty if lat_err is None or lon_err is None: return None d_lat = lat_err d_lon = lon_err * math.cos(origin.latitude * math.pi / 180.0) h_err = km_per_deg * math.hypot(d_lat, d_lon) return h_err
@staticmethod
[docs] def _depth_error(origin): """ Return the absolute depth error in km """ # TODO: add option to extract depth error from origin uncertainty # when ellipsoid is used if origin.depth_errors.uncertainty is None: return None return origin.depth_errors.uncertainty / 1000.0
@staticmethod
[docs] def _num2str(num, precision=6): """ Convert num into a matlab (and thus zmap) compatible string """ if num is None: return 'NaN' else: return '{n:.{p}f}'.format(p=precision, n=num)
@staticmethod
[docs] def _decimal_year(time): """ Return (floating point) decimal year representation of UTCDateTime input value """ start_of_year = UTCDateTime(time.year, 1, 1).timestamp end_of_year = UTCDateTime(time.year + 1, 1, 1).timestamp timestamp = time.timestamp year_fraction = ((timestamp - start_of_year) / (end_of_year - start_of_year)) return time.year + year_fraction
[docs] def _serialize(self, catalog): rows = [] for ev in catalog: strings = dict.fromkeys(self.zmap_columns, 'NaN') # origin origin = ev.preferred_origin() if origin is None and ev.origins: origin = ev.origins[0] if origin: dec_year = self._decimal_year(origin.time) dec_second = origin.time.second + \ origin.time.microsecond / 1e6 strings.update({ 'depth': self._num2str(origin.depth / 1000.0), # m to km 'z_err': self._num2str(self._depth_error(origin)), 'lat': self._num2str(origin.latitude), 'lon': self._num2str(origin.longitude), 'h_err': self._num2str(self._hz_error(origin)), 'year': self._num2str(dec_year, 12), 'month': self._num2str(origin.time.month, 0), 'day': self._num2str(origin.time.day, 0), 'hour': self._num2str(origin.time.hour, 0), 'minute': self._num2str(origin.time.minute, 0), 'second': str(dec_second) }) # magnitude magnitude = ev.preferred_magnitude() if magnitude is None and ev.magnitudes: magnitude = ev.magnitudes[0] if magnitude: strings.update({ 'mag': self._num2str(magnitude.mag), 'm_err': self._num2str(magnitude.mag_errors.uncertainty) }) # create tab separated row rows.append('\t'.join([strings[c] for c in self.zmap_columns])) zmap = '\n'.join(rows) + '\n' return zmap
[docs]class Unpickler(object):
[docs] def load(self, filename): """ Returns an ObsPy Catalog object from a ZMAP file. :type filename: str or file :param filename: Source file name or open file-like object. :rtype: :class:`~obspy.core.event.Catalog` :returns: ObsPy catalog """ # Open filehandler or use an existing file like object. if not hasattr(filename, "read"): file_opened = True fh = open(filename, 'rb') else: file_opened = False fh = filename try: zmap_str = fh.read() if hasattr(zmap_str, 'decode'): zmap_str = zmap_str.decode('utf-8') catalog = self._deserialize(zmap_str) return catalog finally: if file_opened: fh.close()
[docs] def loads(self, zmap_str): """ Returns an ObsPy Catalog object from a ZMAP string. :type zmap_str: str :param zmap_str: ObsPy Catalog object. :rtype: :class:`~obspy.core.event.Catalog` :returns: ObsPy catalog """ return self._deserialize(zmap_str)
[docs] def _deserialize(self, zmap_str): catalog = Catalog() for row in zmap_str.split('\n'): if len(row) == 0: continue origin = Origin() event = Event(origins=[origin]) event.preferred_origin_id = origin.resource_id.id # Begin value extraction columns = row.split('\t', 13)[:13] # ignore extra columns values = dict(zip(_STD_ZMAP_COLUMNS + _EXT_ZMAP_COLUMNS, columns)) # Extract origin origin.longitude = self._str2num(values.get('lon')) origin.latitude = self._str2num(values.get('lat')) depth = self._str2num(values.get('depth')) if depth is not None: origin.depth = depth * 1000.0 z_err = self._str2num(values.get('z_err')) if z_err is not None: origin.depth_errors.uncertainty = z_err * 1000.0 h_err = self._str2num(values.get('h_err')) if h_err is not None: ou = OriginUncertainty() ou.horizontal_uncertainty = h_err ou.preferred_description = 'horizontal uncertainty' origin.origin_uncertainty = ou year = self._str2num(values.get('year')) if year is not None: t_fields = ['year', 'month', 'day', 'hour', 'minute', 'second'] comps = [self._str2num(values.get(f)) for f in t_fields] if year % 1 != 0: origin.time = self._decyear2utc(year) elif any(v > 0 for v in comps[1:]): # no seconds involved if len(comps) < 6: utc_args = [int(v) for v in comps if v is not None] # we also have to handle seconds else: utc_args = [int(v) if v is not None else 0 for v in comps[:-1]] # just leave float seconds as is utc_args.append(comps[-1]) origin.time = UTCDateTime(*utc_args) mag = self._str2num(values.get('mag')) # Extract magnitude if mag is not None: magnitude = Magnitude(mag=mag) m_err = self._str2num(values.get('m_err')) magnitude.mag_errors.uncertainty = m_err event.magnitudes.append(magnitude) event.preferred_magnitude_id = magnitude.resource_id.id event.scope_resource_ids() catalog.append(event) return catalog
@staticmethod
[docs] def _str2num(num_str): try: if num_str is not None and num_str.lower() != 'nan': num = float(num_str) else: return None except ValueError: return None return num
@staticmethod
[docs] def _decyear2utc(decimal_year): """ Return UTCDateTime from decimal year """ start_of_year = UTCDateTime(int(decimal_year), 1, 1) end_of_year = UTCDateTime(int(decimal_year) + 1, 1, 1) t = start_of_year + (decimal_year % 1) * (end_of_year - start_of_year) return t
[docs]def _write_zmap(catalog, filename, with_uncertainties=False, **kwargs): # @UnusedVariable """ Writes a ZMAP file. Extended ZMAP format as used in CSEP (http://www.cseptesting.org) is supported by using keyword argument with_uncertainties=True (default: False) .. warning:: This function should NOT be called directly, it registers via the the :meth:`~obspy.core.event.Catalog.write` method of an ObsPy :class:`~obspy.core.event.Catalog` object, call this instead. :type catalog: :class:`~obspy.core.event.Catalog` :param catalog: The ObsPy Catalog object to write. :type filename: str or file :param filename: Filename to write or open file-like object. :type with_uncertainties: bool :param with_uncertainties: appends non-standard columns for horizontal, magnitude and depth uncertainty (see :mod:`~obspy.io.zmap.core`). """ Pickler(with_uncertainties).dump(catalog, filename)
[docs]def _read_zmap(filename, **kwargs): """ Reads a ZMAP file and returns an ObsPy Catalog object. .. warning:: This function should NOT be called directly, it registers via the ObsPy :func:`~obspy.core.event.read_events` function, call this instead. :type filename: str or file :param filename: ZMAP string, name of ZMAP file to be read or open file-like object. :rtype: :class:`~obspy.core.event.Catalog` :return: An ObsPy Catalog object. """ if not hasattr(filename, 'read'): try: with open(filename): pass except Exception: # we assume it's a string now return Unpickler().loads(filename) return Unpickler().load(filename)
@map_example_filename("filename")
[docs]def _is_zmap(filename): """ Checks whether a file is ZMAP format. Unlike :func:`~obspy.io.zmap.core._read_zmap` *_is_zmap* is strict, i.e. it will not detect a ZMAP file unless it consists of exactly 10 or 13 numerical columns. :type filename: str or file :param filename: ZMAP string, name of the file to be checked or open file-like object. :rtype: bool :return: ``True`` if ZMAP file. .. rubric:: Example >>> _is_zmap('/path/to/zmap_events.txt') True """ if all(hasattr(filename, attr) for attr in ['tell', 'seek', 'read']): # we got an open file pos = filename.tell() filename.seek(0) first_line = filename.readline() filename.seek(pos) if hasattr(first_line, 'decode'): first_line = first_line.decode() else: try: with open(filename, 'rb') as f: first_line = f.readline().decode() except Exception: try: first_line = filename.decode() except Exception: first_line = str(filename) line_ending = first_line.find("\n") if line_ending == -1: return False first_line = first_line[:line_ending] # we expect 10 (standard) or 13 columns (extended) columns = first_line.split('\t') if len(columns) not in [10, 13]: return False # only numerical values are allowed (including NaN) try: [float(col) for col in columns] return True except ValueError: return False
if __name__ == '__main__': import doctest doctest.testmod(exclude_empty=True)