Source code for obspy.io.csv.core

"""
.. note::
    This module can be easily extended to write and read more event
    information.
    If you are interested, please send a PR to the github repository.
      1. Add 'extended' or similar key to FIELDS dict, e.g. as a start use
          'extended': (
              '{time!s:.25} {lat:.6f} {lat_err:.6f} {lon:.6f} {lon_err:.6f} '
              '{dep:.3f} {dep_err:.3f} {mag:.2f} {mag_err:.2f} {magtype} {id}'
              )
         You can also test by just passing the string to `fields` option.
      2. Implement writing functionality in write_csv by adding the new
         properties to the dict d. Missing values have to be handled.
      3. Implement reading functionality for the new properties in read_csv.
      4. Write some tests testing all new properties. Check that an event with
         all new properties defined or missing can be written and read again.
    Similar can be done for the picks using PFIELDS, _write_picks, _read_picks
"""

import csv
from contextlib import contextmanager
import io
import math
import pathlib
from string import Formatter
from warnings import warn
import zipfile


import numpy as np
from obspy import UTCDateTime as UTC, __version__
from obspy.core import event as evmod
from obspy.core.util.decorator import map_example_filename


# for reading
DEFAULT = {'magtype': ''}
# for writing
FIELDS = {
    'basic': (
        '{id} {time!s:.25} {lat:.6f} {lon:.6f} {dep:.3f} '
        '{magtype} {mag:.2f}'),
    'eventtxt': (
        '{id},{time!s:.25},{lat:.6f},{lon:.6f},{dep:.3f},{author},,{contrib},,'
        '{magtype},{mag:.2f},{magauthor},{region}').split(',')
}
PFIELDS = {
    'basic': '{seedid} {phase} {time:.5f} {weight:.3f}'
}
CSZ_COMMENT = f'CSZ format ObsPy v{__version__} obspy_no_uncompress'.encode(
    'utf-8')
# for load_csv
DTYPE = {
    'time': 'datetime64[ms]',
    'lat': float,
    'lon': float,
    'dep': float,
    'mag': float,
    'magtype': 'U10',
    'id': 'U50'
}

# catalog and contribid are not used
EVENTTXT_NAMES = (  # for reading
    'id time lat lon dep author catalog contrib contribid '
    'magtype mag magauthor region')
EVENTTXT_HEADER = (  # for writing
    '#EventID | Time | Latitude | Longitude | Depth/km | Author | Catalog | '
    'Contributor | ContributorID | MagType | Magnitude | MagAuthor | '
    'EventLocationName')


[docs] def _is_csv(fname, **kwargs): try: return _read_csv(fname, format_check=True) except Exception: return False
[docs] def _is_eventtxt(fname, **kwargs): try: return _read_eventtxt(fname, format_check=True) except Exception: return False
[docs] def _is_csz(fname, **kwargs): try: if not zipfile.is_zipfile(fname): return False with zipfile.ZipFile(fname) as zipf: if not (zipf.comment.startswith(b'CSZ') and zipf.comment.endswith(b'obspy_no_uncompress')): return False return True except Exception: return False
[docs] def _evid(event): return str(event.resource_id).split('/')[-1]
[docs] def _origin(event): return event.preferred_origin() or event.origins[0]
[docs] @contextmanager def _open(filein, *args, **kwargs): """Accept files or file names or pathlib objects""" if isinstance(filein, (str, pathlib.PurePath)): # filename with open(filein, *args, **kwargs) as f: yield f else: # file-like object yield filein
[docs] def _names_sequence(names): if isinstance(names, dict): names = [names.get(i, '_') for i in range(max(names.keys())+1)] elif ' ' in names: names = names.split() return names
[docs] def _string(row, key): if key in row and row[key].strip() != '': return row[key].strip()
[docs] def _read_csv(fname, skipheader=0, default=None, names=None, format_check=False, **kwargs): """ Read a CSV file and return ObsPy catalog :param fname: filename or file-like object to read from :param skipheader: skip first rows of file :param default: dictionary with default values, at the moment only magtype is supported, i.e. to set magtypes use ``default={'magtype': 'Ml'}`` :param names: determined automatically from header line of file, otherwise can be specified as string, sequence or dict :param format_check: Check if the first event can be read :param **kargs: all other kwargs are passed to csv.DictReader, important additional arguments are fieldnames, dialect, delimiter, etc Example ------- >>> from obspy import read_events >>> events = read_events('/path/to/catalog.csv') >>> print(events) # doctest: +NORMALIZE_WHITESPACE 3 Event(s) in Catalog: 2012-04-04T14:21:42.300000Z | +41.818, +79.689 | 4.4 mb 2012-04-04T14:18:37.000000Z | +39.342, +41.044 | 4.3 ML 2012-04-04T14:08:46.000000Z | +38.017, +37.736 | 3.0 ML You can read CSV files created by this module or external CSV files. Example reading an external CSV file: >>> from obspy import read_events >>> names = 'year mon day hour minu sec _ lat lon dep mag id' >>> events = read_events( ... '/path/to/external.csv', 'CSV', skipheader=1, names=names) >>> print(events) # doctest: +NORMALIZE_WHITESPACE 1 Event(s) in Catalog: 2023-05-06T19:55:01.300000Z | +10.194, +124.830 | 0.2 """ if default is None: default = DEFAULT events = [] with _open(fname) as f: for _ in range(skipheader): f.readline() if names is not None: kwargs.setdefault('fieldnames', _names_sequence(names)) reader = csv.DictReader(f, **kwargs) for row in reader: if 'time' in row: time = UTC(row['time']) else: time = UTC( '{year}-{mon}-{day} {hour}:{minu}:{sec}'.format(**row)) try: if 'depm' in row: dep = float(row['depm']) else: dep = float(row['dep']) * 1000 if math.isnan(dep): raise except Exception: dep = None author = _string(row, 'author') contrib = _string(row, 'contrib') if author is not None or contrib is not None: info = evmod.CreationInfo(author=author, agency_id=contrib) else: info = None origin = evmod.Origin( time=time, latitude=row['lat'], longitude=row['lon'], depth=dep, creation_info=info ) try: # add zero to eliminate negative zeros in magnitudes mag = float(row['mag']) + 0 if math.isnan(mag): raise except Exception: magnitudes = [] else: try: magtype = row['magtype'] if magtype.lower() in ('', 'none', 'null', 'nan'): raise except Exception: magtype = default.get('magtype') magauthor = _string(row, 'magauthor') info = (evmod.CreationInfo(author=magauthor) if magauthor else None) magnitudes = [evmod.Magnitude( mag=mag, magnitude_type=magtype, creation_info=info)] try: id_ = evmod.ResourceIdentifier(row['id'].strip()) except Exception: id_ = None region = _string(row, 'region') descs = ([evmod.EventDescription(region, 'region name')] if region else []) event = evmod.Event( magnitudes=magnitudes, origins=[origin], resource_id=id_, event_descriptions=descs ) events.append(event) if format_check: return True if format_check: # empty file will return an empty catalog, # but it is not detected as CSV file return False return evmod.Catalog(events=events)
[docs] def _write_csv(events, fname, fields='basic', depth_in_km=True, delimiter=',', header=None): """ Write ObsPy catalog to CSV file :param events: catalog or list of events :param fname: filename or file-like object to write to :param fields: set format and header names of CSV file, see :const:`obspy.io.csv.FIELDS`, you can use your own format string here, just make sure to use the pre-defined header names :param depth_in_km: write depth in units of kilometer (default: True) or meter :param delimiter: defaults to `','`, if the delimiter is changed, ObsPy's read_events function will not automatically identify the file as CSV file :param header: Use a non-default header row .. warning:: If the parameters `delimiter` or `header` are changed, ObsPy's read_events function will not automatically identify the file as CSV file Example ------- >>> from obspy import read_events >>> events = read_events() # get example catalog >>> events.write('local_catalog.csv', 'CSV') # declare 'CSV' as format >>> with open('local_catalog.csv') as f: print(f.read()) id,time,lat,lon,dep,magtype,mag 20120404_0000041,2012-04-04T14:21:42.30000,41.818000,79.689000,1.000,mb,4.40 20120404_0000038,2012-04-04T14:18:37.00000,39.342000,41.044000,14.400,ML,4.30 20120404_0000039,2012-04-04T14:08:46.00000,38.017000,37.736000,7.000,ML,3.00 <BLANKLINE> """ fields = FIELDS.get(fields, fields) if ' ' in fields: fields = fields.split() fmtstr = delimiter.join(fields) if not depth_in_km and 'depm' not in fmtstr: fmtstr = fmtstr.replace('dep', 'depm') if header is None: fieldnames = [ fn for _, fn, _, _ in Formatter().parse(fmtstr) if fn is not None] header = delimiter.join(fieldnames) with _open(fname, 'w') as f: f.write(header + '\n') for event in events: evid = str(event.resource_id).split('/')[-1] try: origin = _origin(event) except Exception: warn(f'No origin found -> do not write event {evid}') continue try: author = origin.creation_info.author except AttributeError: author = '' try: contrib = origin.creation_info.agency_id except AttributeError: contrib = '' try: magnitude = event.preferred_magnitude() or event.magnitudes[0] except Exception: warn(f'No magnitude found for event {evid}') mag = float('nan') magtype = '' magauthor = '' else: mag = magnitude.mag magtype = magnitude.magnitude_type or '' try: magauthor = magnitude.creation_info.author except AttributeError: magauthor = '' try: dep = origin.depth / (1000 if depth_in_km else 1) except Exception: warn(f'No depth set for event {evid}') dep = float('nan') for description in event.event_descriptions: if str(description.type) == 'region name': region = description.text break else: region = '' d = {'time': origin.time, 'lat': origin.latitude, 'lon': origin.longitude, 'dep' if depth_in_km else 'depm': dep, 'mag': mag, 'magtype': magtype, 'id': evid, 'author': author, 'contrib': contrib, 'magauthor': magauthor, 'region': region, } f.write(fmtstr.format(**d).replace('nan', '') + '\n')
[docs] @map_example_filename('fname') def load_csv(fname, skipheader=0, only=None, names=None, delimiter=',', **kw): """ Load CSV or CSZ file into numpy array :param skipheader, names: see :func:`_read_csv` :param only: sequence, read only columns speified by name :param **kw: Other kwargs are passed to :func:`numpy.loadtxt` For an example see :mod:`obspy.io.csv`. """ if isinstance(fname, str) and zipfile.is_zipfile(fname): with zipfile.ZipFile(fname) as zipf: with io.TextIOWrapper( zipf.open('events.csv'), encoding='utf-8') as f: return load_csv(f) with _open(fname) as f: for _ in range(skipheader): f.readline() if names is None: names = f.readline().strip().split(',') names = _names_sequence(names) dtype = [(n, DTYPE[n]) for n in names if n in DTYPE and (only is None or n in only)] usecols = [i for i, n in enumerate(names) if n in DTYPE and (only is None or n in only)] kw.setdefault('usecols', usecols) kw.setdefault('dtype', dtype) return np.genfromtxt(f, delimiter=delimiter, **kw)
[docs] def _events2array(events, **kw): """ Convert ObsPy catalog to numpy array All kwargs are passed to :func:`load_csv`, e.g. use `only=('lat', 'lon', 'mag')` to get an array with lat, lon, mag parameters. Example ------- >>> from obspy import read_events >>> from obspy.io.csv import _events2array >>> events = read_events() >>> t = _events2array(events) >>> print(t.dtype.names) ('id', 'time', 'lat', 'lon', 'dep', 'magtype', 'mag') >>> print(t) # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS [ ('20120404_0000041', '2012-04-04...', 41.818, 79.689, 1. , 'mb', 4.4) ('20120404_0000038', '2012-04-04T...', 39.342, 41.044, 14.4, 'ML', 4.3) ('20120404_0000039', '2012-04-04T...', 38.017, 37.736, 7. , 'ML', 3. )] >>> print(t['mag']) [ 4.4 4.3 3. ] """ with io.StringIO() as f: _write_csv(events, f) f.seek(0) return load_csv(f, **kw)
[docs] def _read_eventtxt(fname, default=None, format_check=False): """ Read EVENTTXT file and return ObsPy catalog :param fname: filename or file-like object to read from :param default: dictionary with default values, at the moment only magtype is supported, i.e. to set magtypes use ``default={'magtype': 'Ml'}`` :param format_check: Check if the first event can be read For an example see :mod:`obspy.io.csv`. """ return _read_csv(fname, skipheader=1, names=EVENTTXT_NAMES, delimiter='|', default=default, format_check=format_check)
[docs] def _write_eventtxt(events, fname): """ Write ObsPy catalog to EVENTTXT file :param events: catalog or list of events :param fname: filename or file-like object to write to For an example see :mod:`obspy.io.csv`. """ return _write_csv(events, fname, fields='eventtxt', header=EVENTTXT_HEADER, delimiter='|')
[docs] @map_example_filename('fname') def load_eventtxt(fname, **kw): """ Load EVENTTXT file into numpy array For possible arguments see :func:`load_csv`. For an example see :mod:`obspy.io.csv`. """ kw.setdefault('delimiter', '|') kw.setdefault('skipheader', 1) return load_csv(fname, names=EVENTTXT_NAMES, **kw)
[docs] def _read_picks(event, fname): """ Read picks from CSV file and add them to event """ otime = _origin(event).time picks = [] arrivals = [] with _open(fname) as f: reader = csv.DictReader(f) for row in reader: phase = row['phase'] seedid = row['seedid'] wid = (evmod.WaveformStreamID(seed_string=seedid) if seedid else None) pick = evmod.Pick(waveform_id=wid, phase_hint=phase, time=otime + float(row['time'])) arrival = evmod.Arrival(phase=phase, pick_id=pick.resource_id, time_weight=float(row['weight'])) picks.append(pick) arrivals.append(arrival) event.picks = picks event.origins[0].arrivals = arrivals
[docs] def _write_picks(event, fname, fields_picks='basic', delimiter=','): """ Write picks from event to a CSV file """ fields = PFIELDS.get(fields_picks, fields_picks) if ' ' in fields: fields = fields.split() fmtstr = delimiter.join(fields) fieldnames = [ fn for _, fn, _, _ in Formatter().parse(fmtstr) if fn is not None] origin = _origin(event) weights = {str(arrival.pick_id): arrival.time_weight for arrival in origin.arrivals if arrival.time_weight} phases = {str(arrival.pick_id): arrival.phase for arrival in origin.arrivals if arrival.phase} with _open(fname, 'w') as f: f.write(delimiter.join(fieldnames) + '\n') for pick in event.picks: pick_id = str(pick.resource_id) try: seedid = pick.waveform_id.id if seedid is None: raise AttributeError except AttributeError: warn(f'No waveform id found for pick {pick_id}') seedid = '' d = {'time': pick.time - origin.time, 'seedid': seedid, 'phase': phases.get(pick_id, pick.phase_hint), 'weight': weights.get(pick_id, 1.)} f.write(fmtstr.format(**d) + '\n')
[docs] def _read_csz(fname, default=None): """ Read a CSZ file and return ObsPy catalog with picks :param fname: filename or file-like object to read from :param default: dictionary with default values, at the moment only magtype is supported, i.e. to set magtypes use ``default={'magtype': 'Ml'}`` For an example see :mod:`obspy.io.csv`. """ with zipfile.ZipFile(fname) as zipf: with io.TextIOWrapper(zipf.open('events.csv'), encoding='utf-8') as f: events = _read_csv(f, default=default) for event in events: evid = _evid(event) fname = f'picks_{evid}.csv' if fname not in zipf.namelist(): continue with io.TextIOWrapper(zipf.open(fname), encoding='utf-8') as f: _read_picks(event, f) return events
[docs] def _write_csz(events, fname, fields='basic', fields_picks='basic', **kwargs): """ Write ObsPy catalog to CSZ file :param events: catalog or list of events :param fname: filename or file-like object to write to :param fields: set format and header names of CSV file, see :const:`obspy.io.csv.FIELDS`, you can use your own format string here, just make sure to use the pre-defined header names :param fields_picks: set format and header names of CSV pick files, see :const:`obspy.io.csv.PFIELDS`, you can use your own format string here, just make sure to use the pre-defined header names :param **kwargs: compression and compression level can be specified see `zipfile doc <https://docs.python.org/library/zipfile.html#zipfile.ZipFile>`_: ``` events.write('CSZ', compression=True, compresslevel=9) ``` For an example see :mod:`obspy.io.csv`. """ # allow True as value for compression if kwargs.get('compression') is True: kwargs['compression'] = zipfile.ZIP_DEFLATED with zipfile.ZipFile(fname, mode='w', **kwargs) as zipf: zipf.comment = CSZ_COMMENT with io.StringIO() as f: _write_csv(events, f, fields=fields) zipf.writestr('events.csv', f.getvalue()) for event in events: if len(event.picks) == 0: continue evid = str(event.resource_id).split('/')[-1] try: _origin(event) except Exception: continue with io.StringIO() as f: _write_picks(event, f, fields_picks=fields_picks) zipf.writestr(f'picks_{evid}.csv', f.getvalue())