Source code for obspy.clients.fdsn.mass_downloader.download_helpers

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Helpers for the mass downloader.

Intended to simplify and stabilize the logic of the mass downloader and make
it understandable in the first place.

:copyright:
    Lion Krischer (krischer@geophysik.uni-muenchen.de), 2014-2015
:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
from future.builtins import *  # NOQA

import collections
import copy
import fnmatch
import itertools
import sys
from multiprocessing.pool import ThreadPool
import os
import time
import timeit

if sys.version_info.major == 2:
    from itertools import ifilterfalse as filterfalse
else:
    from itertools import filterfalse

import numpy as np

from lxml.etree import XMLSyntaxError

import obspy
from obspy.core.util import Enum

from . import utils

# The current status of an entity.
STATUS = Enum(["none", "needs_downloading", "downloaded", "ignore", "exists",
               "download_failed", "download_rejected",
               "download_partially_failed"])


class _SlotsEqualityComparisionObject(object):
    """
    Helper object with an equality comparision method simply comparing all
    slotted attributes.
    """
    __slots__ = []

    def __eq__(self, other):
        if type(self) != type(other):
            return False
        return all([getattr(self, _i) == getattr(other, _i)
                    for _i in self.__slots__])


[docs]class Station(_SlotsEqualityComparisionObject): """ Object representing a seismic station within the download helper classes. It knows the coordinates of the station to perform the filtering, its channels and the filename and status of the StationXML files. :param network: The network code. :type network: str :param station: The station code. :type station: str :param latitude: The latitude of the station. :type latitude: float :param longitude: The longitude of the station. :type longitude: float :param channels: The channels of the station. :type channels: list of :class:`~.Channel` objects :param stationxml_filename: The filename of the StationXML file. :type stationxml_filename: str :param stationxml_status: The current status of the station. :type stationxml_filename: :class:`~.STATUS` """ __slots__ = ["network", "station", "latitude", "longitude", "channels", "_stationxml_filename", "want_station_information", "miss_station_information", "have_station_information", "stationxml_status"]
[docs] def __init__(self, network, station, latitude, longitude, channels, stationxml_filename=None, stationxml_status=None): # Station attributes. self.network = network self.station = station self.latitude = latitude self.longitude = longitude self.channels = channels # Station information settings. self.stationxml_filename = stationxml_filename self.stationxml_status = stationxml_status and STATUS.NONE # Internally keep track of which channels and time interval want # station information, which miss station information and which # already have some. want_station_information should always be the # union of miss and have. self.want_station_information = {} self.miss_station_information = {} self.have_station_information = {}
@property def has_existing_or_downloaded_time_intervals(self): """ Returns true if any of the station's time intervals have status "DOWNLOADED" or "EXISTS". Otherwise it returns False meaning it does not have to be considered anymore. """ status = set() for chan in self.channels: for ti in chan.intervals: status.add(ti.status) if STATUS.EXISTS in status or STATUS.DOWNLOADED in status: return True return False @property def has_existing_time_intervals(self): """ Returns True if any of the station's time intervals already exist. """ for chan in self.channels: for ti in chan.intervals: if ti.status == STATUS.EXISTS: return True return False
[docs] def remove_files(self, logger, reason): """ Delete all files under it. Only delete stuff that actually has been downloaded! """ for chan in self.channels: for ti in chan.intervals: if ti.status != STATUS.DOWNLOADED or not ti.filename: continue if os.path.exists(ti.filename): logger.info("Deleting MiniSEED file '%s'. Reason: %s" % ( ti.filename, reason)) utils.safe_delete(ti.filename) if self.stationxml_status == STATUS.DOWNLOADED and \ self.stationxml_filename and \ os.path.exists(self.stationxml_filename): logger.info("Deleting StationXMl file '%s'. Reason: %s" % (self.stationxml_filename, reason)) utils.safe_delete(self.stationxml_filename)
@property def stationxml_filename(self): return self._stationxml_filename @stationxml_filename.setter def stationxml_filename(self, value): """ Setter creating the directory for the file if it does not already exist. """ self._stationxml_filename = value if not value: return dirname = os.path.dirname(value) if not os.path.exists(dirname): os.makedirs(dirname) @property def temporal_bounds(self): """ Return the temporal bounds for the station. """ starttimes = [] endtimes = [] for channel in self.channels: s, e = channel.temporal_bounds starttimes.append(s) endtimes.append(e) return min(starttimes), max(endtimes)
[docs] def __str__(self): channels = "\n".join(str(i) for i in self.channels) channels = "\n\t".join(channels.splitlines()) return ( "Station '{network}.{station}' [Lat: {lat:.2f}, Lng: {lng:.2f}]\n" "\t-> Filename: {filename} ({status})\n" "\t-> Wants station information for channels: {want}\n" "\t-> Has station information for channels: {has}\n" "\t-> Misses station information for channels: {miss}\n" "\t{channels}" ).format( network=self.network, station=self.station, lat=self.latitude, lng=self.longitude, filename=self.stationxml_filename, status="exists" if (self.stationxml_filename and os.path.exists( self.stationxml_filename)) else "does not yet exist", want=", ".join(["%s.%s" % (_i[0], _i[1]) for _i in self.want_station_information.keys()]), has=", ".join(["%s.%s" % (_i[0], _i[1]) for _i in self.have_station_information.keys()]), miss=", ".join(["%s.%s" % (_i[0], _i[1]) for _i in self.miss_station_information.keys()]), channels=channels)
[docs] def prepare_stationxml_download(self, stationxml_storage, logger): """ Figure out what to download. :param stationxml_storage: """ # Determine what channels actually want to have station information. # This will be a tuple of location code, channel code, starttime, # and endtime. self.want_station_information = {} for channel in self.channels: if channel.needs_station_file is False: continue self.want_station_information[ (channel.location, channel.channel)] = channel.temporal_bounds # No channel has any data, thus nothing will happen. if not self.want_station_information: self.stationxml_status = STATUS.NONE return # Only those channels that now actually want station information # will be treated in the following. s, e = self.temporal_bounds storage = utils.get_stationxml_filename( stationxml_storage, self.network, self.station, list(self.want_station_information.keys()), starttime=s, endtime=e) # The simplest case. The function returns a string. Now two things # can happen. if isinstance(storage, (str, bytes)): filename = storage self.stationxml_filename = filename # 1. The file does not yet exist. Thus all channels must be # downloaded. if not os.path.exists(filename): self.miss_station_information = \ copy.deepcopy(self.want_station_information) self.have_station_information = {} self.stationxml_status = STATUS.NEEDS_DOWNLOADING return # 2. The file does exist. It will be parsed. If it contains ALL # necessary information, nothing will happen. Otherwise it will # be overwritten. else: info = utils.get_stationxml_contents(filename) for c_id, times in self.want_station_information.items(): # Get the temporal range of information in the file. c_info = [_i for _i in info if _i.network == self.network and _i.station == self.station and _i.location == c_id[0] and _i.channel == c_id[1]] if not c_info: break starttime = min([_i.starttime for _i in c_info]) endtime = max([_i.endtime for _i in c_info]) if starttime > times[0] or endtime < times[1]: break # All good if no break is called. else: self.have_station_information = \ copy.deepcopy(self.want_station_information) self.miss_station_information = {} self.stationxml_status = STATUS.EXISTS return # Otherwise everything will be downloaded. self.miss_station_information = \ copy.deepcopy(self.want_station_information) self.have_station_information = {} self.stationxml_status = STATUS.NEEDS_DOWNLOADING return # The other possibility is that a dictionary is returned. else: # The types are already checked by the get_stationxml_filename() # function. missing_channels = storage["missing_channels"] available_channels = storage["available_channels"] # Get the channels wanting station information and filter them. channels_wanting_station_information = copy.deepcopy( self.want_station_information ) # Figure out what channels are missing and will be downloaded. self.miss_station_information = {} for channel in missing_channels: if channel not in channels_wanting_station_information: continue self.miss_station_information[channel] = \ channels_wanting_station_information[channel] # Same thing but with the already available channels. self.have_station_information = {} for channel in available_channels: if channel not in channels_wanting_station_information: continue self.have_station_information[channel] = \ channels_wanting_station_information[channel] self.stationxml_filename = storage["filename"] # Raise a warning if something is missing, but do not raise an # exception or halt the program at this point. have_channels = set(self.have_station_information.keys()) miss_channels = set(self.miss_station_information.keys()) want_channels = set(self.want_station_information.keys()) if have_channels.union(miss_channels) != want_channels: logger.warning( "The custom `stationxml_storage` did not return " "information about channels %s" % str(want_channels.difference(have_channels.union( miss_channels)))) if self.miss_station_information: self.stationxml_status = STATUS.NEEDS_DOWNLOADING elif not self.miss_station_information and \ self.have_station_information: self.stationxml_status = STATUS.EXISTS else: self.stationxml_status = STATUS.IGNORE
[docs] def prepare_mseed_download(self, mseed_storage): """ Loop through all channels of the station and distribute filenames and the current status of the channel. A MiniSEED interval will be ignored, if the `mseed_storage` function returns `True`. Possible statuses after method execution are IGNORE, EXISTS, and NEEDS_DOWNLOADING. :param mseed_storage: """ for channel in self.channels: for interval in channel.intervals: interval.filename = utils.get_mseed_filename( mseed_storage, self.network, self.station, channel.location, channel.channel, interval.start, interval.end) if interval.filename is True: interval.status = STATUS.IGNORE elif os.path.exists(interval.filename): interval.status = STATUS.EXISTS else: if not os.path.exists(os.path.dirname(interval.filename)): os.makedirs(os.path.dirname(interval.filename)) interval.status = STATUS.NEEDS_DOWNLOADING
[docs] def sanitize_downloads(self, logger): """ Should be run after the MiniSEED and StationXML downloads finished. It will make sure that every MiniSEED file also has a corresponding StationXML file. It will delete MiniSEED files but never a StationXML file. The logic of the download helpers does not allow for a StationXML file with no data. """ from obspy.io.mseed.util import get_start_and_end_time # All or nothing for each channel. for id in self.miss_station_information.keys(): logger.warning("Station information could not be downloaded for " "%s.%s.%s.%s. MiniSEED files outside of the " "station information period " "will be deleted!" % ( self.network, self.station, id[0], id[1])) channel = [_i for _i in self.channels if (_i.location, _i.channel) == id][0] for time_interval in channel.intervals: # Check that file exists before proceeding if not time_interval.filename or \ not os.path.isfile(time_interval.filename): continue # Check that the time_interval.start and end are correct! time_interval.start, time_interval.end = \ get_start_and_end_time(time_interval.filename) # Only delete downloaded things! if time_interval.status == STATUS.DOWNLOADED: # Only delete if the station data are actually missing # for this time miss_start, miss_end = self.miss_station_information[id] if miss_start <= time_interval.start <= miss_end and \ miss_start <= time_interval.end <= miss_end: utils.safe_delete(time_interval.filename) time_interval.status = STATUS.DOWNLOAD_REJECTED
[docs]class Channel(_SlotsEqualityComparisionObject): """ Object representing a Channel. Each time interval should end up in one MiniSEED file. """ __slots__ = ["location", "channel", "intervals"]
[docs] def __init__(self, location, channel, intervals): self.location = location self.channel = channel self.intervals = intervals
@property def needs_station_file(self): """ Determine if the channel requires any station information. As soon as the status of at least one interval is either ``DOWNLOADED`` or ``EXISTS`` the whole channel will be thought of as requiring station information. This does not yet mean that station information will be downloaded. That is decided at a later stage. """ status = set([_i.status for _i in self.intervals]) if STATUS.DOWNLOADED in status or STATUS.EXISTS in status: return True return False @property def temporal_bounds(self): """ Returns a tuple of the minimum start time and the maximum end time. """ return (min([_i.start for _i in self.intervals]), max([_i.end for _i in self.intervals]))
[docs] def __str__(self): return "Channel '{location}.{channel}':\n\t{intervals}".format( location=self.location, channel=self.channel, intervals="\n\t".join([str(i) for i in self.intervals]))
[docs]class TimeInterval(_SlotsEqualityComparisionObject): """ Simple object representing a time interval of a channel. It knows the temporal bounds of the interval, the (desired) filename, and the current status of the interval. :param start: The start of the interval. :type start: :class:`~obspy.core.utcdatetime.UTCDateTime` :param end: The end of the interval. :type end: :class:`~obspy.core.utcdatetime.UTCDateTime` :param filename: The filename of the interval. :type filename: str :param status: The status of the time interval. :param status: :class:`~.STATUS` """ __slots__ = ["start", "end", "filename", "status"]
[docs] def __init__(self, start, end, filename=None, status=None): self.start = start self.end = end self.filename = filename self.status = status if status is not None else STATUS.NONE
[docs] def __repr__(self): return "TimeInterval(start={start}, end={end}, filename={filename}, " \ "status='{status}')".format( start=repr(self.start), end=repr(self.end), filename="'%s'" % self.filename if self.filename is not None else "None", status=str(self.status))
[docs]class ClientDownloadHelper(object): """ :type client: :class:`obspy.fdsn.client.Client` :param client: An initialized FDSN client. :type client_name: str :param client_name: The name of the client. Only used for logging. :type restrictions: :class:`~.restrictions.Restrictions` :param restrictions: The non-domain related restrictions for the query. :type domain: :class:`~.domain.Domain` subclass :param domain: The domain definition. :param mseed_storage: The MiniSEED storage settings. :param stationxml_storage: The StationXML storage settings. :param logger: An active logger instance. """
[docs] def __init__(self, client, client_name, restrictions, domain, mseed_storage, stationxml_storage, logger): self.client = client self.client_name = client_name self.restrictions = restrictions self.domain = domain self.mseed_storage = mseed_storage self.stationxml_storage = stationxml_storage self.logger = logger self.stations = {} self.is_availability_reliable = None
[docs] def __bool__(self): return bool(len(self))
[docs] def __str__(self): avail_map = { None: "Unknown reliability of availability information", True: "Reliable availability information", False: "Non-reliable availability information" } reliability = avail_map[self.is_availability_reliable] return ( "ClientDownloadHelper object for client '{client}' ({url})\n" "-> {reliability}\n" "-> Manages {station_count} stations.\n{stations}").format( client=self.client_name, url=self.client.base_url, reliability=reliability, station_count=len(self), stations="\n".join([str(_i) for _i in self.stations.values()]))
[docs] def __len__(self): return len(self.stations)
[docs] def prepare_mseed_download(self): """ Prepare each Station for the MiniSEED downloading stage. This will distribute filenames and identify files that require downloading. """ for station in self.stations.values(): station.prepare_mseed_download(mseed_storage=self.mseed_storage)
[docs] def filter_stations_based_on_minimum_distance( self, existing_client_dl_helpers): """ Removes stations until all stations have a certain minimum distance to each other. Returns the rejected stations which is mainly useful for testing. :param existing_client_dl_helpers: Instances of already existing client download helpers. :type existing_client_dl_helpers: list of :class:`~.ClientDownloadHelper` """ if not self.restrictions.minimum_interstation_distance_in_m: # No rejected stations. return [] # Create a sorted copy that will be used in the following. Make it # more deterministic by sorting the stations based on the id. stations = copy.copy(list(self.stations.values())) stations = sorted(stations, key=lambda x: (x.network, x.station)) existing_stations = [] for dlh in existing_client_dl_helpers: existing_stations.extend(list(dlh.stations.values())) remaining_stations = [] rejected_stations = [] # There are essentially two possibilities. If no station exists yet, # it will choose the largest subset of stations satisfying the # minimum inter-station distance constraint. if not existing_stations: # Build k-d-tree and query for the neighbours of each point within # the minimum distance. kd_tree = utils.SphericalNearestNeighbour(stations) nns = kd_tree.query_pairs( self.restrictions.minimum_interstation_distance_in_m) indexes_to_remove = [] # Keep removing the station with the most pairs until no pairs are # left. while nns: most_common = collections.Counter( itertools.chain.from_iterable(nns)).most_common()[0][0] indexes_to_remove.append(most_common) nns = list(filterfalse(lambda x: most_common in x, nns)) # Remove these indices this results in a set of stations we wish to # keep. new_remaining_stations = [_i[1] for _i in enumerate(stations) if _i[0] not in indexes_to_remove] new_rejected_stations = [_i[1] for _i in enumerate(stations) if _i[0] in indexes_to_remove] # Station objects are not hashable thus we have to go the long # route. for st in new_remaining_stations: if st not in remaining_stations: remaining_stations.append(st) for st in new_rejected_stations: if st not in rejected_stations: rejected_stations.append(st) # Otherwise it will add new stations approximating a Poisson disk # distribution. else: while stations: # kd-tree with all existing_stations existing_kd_tree = utils.SphericalNearestNeighbour( existing_stations) # Now we have to get the distance to the closest existing # station for all new stations. distances = np.ma.array(existing_kd_tree.query(stations)[0]) if np.isinf(distances[0]): break distances.mask = False # Step one is to get rid of all stations that are closer # than the minimum distance to any existing station. remove = np.where( distances < self.restrictions.minimum_interstation_distance_in_m)[0] rejected_stations.extend([stations[_i] for _i in remove]) keep = np.where( distances >= self.restrictions.minimum_interstation_distance_in_m)[0] distances.mask[remove] = True if len(keep): # Station with the largest distance to next closer station. largest = np.argmax(distances) remaining_stations.append(stations[largest]) existing_stations.append(stations[largest]) # Add all rejected stations here. stations = [stations[_i] for _i in keep if _i != largest] else: stations = [] # Now actually delete the files and everything of the rejected # stations. for station in rejected_stations: station.remove_files(logger=self.logger, reason="Minimum distance filtering.") self.stations = {} for station in remaining_stations: self.stations[(station.network, station.station)] = station # Return the rejected stations. return {(_i.network, _i.station): _i for _i in rejected_stations}
[docs] def prepare_stationxml_download(self): """ Prepare each Station for the StationXML downloading stage. This will distribute filenames and identify files that require downloading. """ for station in self.stations.values(): station.prepare_stationxml_download( stationxml_storage=self.stationxml_storage, logger=self.logger)
[docs] def download_stationxml(self, threads=3): """ Actually download the StationXML files. :param threads: Limits the maximum number of threads for the client. """ def star_download_station(args): """ Maps arguments to the utils.download_stationxml() function. :param args: The to-be mapped arguments. """ try: ret_val = utils.download_stationxml(*args, logger=self.logger) except utils.ERRORS as e: self.logger.error(str(e)) return None return ret_val # Build up everything we want to download. arguments = [] for station in self.stations.values(): if not station.miss_station_information: continue s, e = station.temporal_bounds if self.restrictions.station_starttime: s = self.restrictions.station_starttime if self.restrictions.station_endtime: e = self.restrictions.station_endtime bulk = [(station.network, station.station, channel.location, channel.channel, s, e) for channel in station.channels] arguments.append((self.client, self.client_name, bulk, station.stationxml_filename)) if not arguments: self.logger.info("Client '%s' - No station information to " "download." % self.client_name) return # Download it. s_time = timeit.default_timer() pool = ThreadPool(min(threads, len(arguments))) results = pool.map(star_download_station, arguments) pool.close() e_time = timeit.default_timer() results = [_i for _i in results if _i is not None] # Check it. filecount = 0 download_size = 0 # Update the station structures. Loop over each returned file. for s_id, filename in results: filecount += 1 station = self.stations[s_id] size = os.path.getsize(filename) download_size += size # Extract information about that file. try: info = utils.get_stationxml_contents(filename) # Sometimes some services choose to not return XML files - guard # against it and just delete the file. At subsequent runs the # mass downloader will attempt to download it again. except XMLSyntaxError: self.logger.info( "Client '%s' - File %s is not an XML file - it will be " "deleted." % (self.client_name, filename)) utils.safe_delete(filename) continue still_missing = {} # Make sure all missing information has been downloaded by # looping over each channel of the station that originally # requested to be downloaded. for c_id, times in station.miss_station_information.items(): # Get the temporal range of information in the file. c_info = [_i for _i in info if _i.network == station.network and _i.station == station.station and _i.location == c_id[0] and _i.channel == c_id[1]] if not c_info: continue starttime = min([_i.starttime for _i in c_info]) endtime = max([_i.endtime for _i in c_info]) if starttime > times[0] or endtime < times[1]: # Cope with case that not full day of station info missing if starttime < times[1]: still_missing[c_id] = (times[0], starttime) station.have_station_information[c_id] = (starttime, times[1]) elif endtime > times[0]: still_missing[c_id] = (endtime, times[1]) station.have_station_information[c_id] = (times[0], endtime) else: still_missing[c_id] = times continue station.have_station_information[c_id] = times station.miss_station_information = still_missing if still_missing: station.stationxml_status = STATUS.DOWNLOAD_PARTIALLY_FAILED else: station.stationxml_status = STATUS.DOWNLOADED # Now loop over all stations and set the status of the ones that # still need downloading to download failed. for station in self.stations.values(): if station.stationxml_status == STATUS.NEEDS_DOWNLOADING: station.stationxml_status = STATUS.DOWNLOAD_FAILED self.logger.info("Client '%s' - Downloaded %i station files [%.1f MB] " "in %.1f seconds [%.2f KB/sec]." % ( self.client_name, filecount, download_size / 1024.0 ** 2, e_time - s_time, (download_size / 1024.0) / (e_time - s_time)))
[docs] def download_mseed(self, chunk_size_in_mb=25, threads_per_client=3): """ Actually download MiniSEED data. :param chunk_size_in_mb: Attempt to download data in chunks of this size. :param threads_per_client: Threads to launch per client. 3 seems to be a value in agreement with some data centers. """ # Estimate the download size to have equally sized chunks. channel_sampling_rate = { "F": 5000, "G": 5000, "D": 1000, "C": 1000, "E": 250, "S": 80, "H": 250, "B": 80, "M": 10, "L": 1, "V": 0.1, "U": 0.01, "R": 0.001, "P": 0.0001, "T": 0.00001, "Q": 0.000001, "A": 5000, "O": 5000} # Split into chunks of about equal size in terms of filesize. chunks = [] chunks_curr = [] curr_chunks_mb = 0 # Don't request more than 50 chunks at once to not choke the servers. max_chunk_length = 50 counter = collections.Counter() # Keep track of attempted downloads. for sta in self.stations.values(): for cha in sta.channels: # The band code is used to estimate the sampling rate of the # data to be downloaded. band_code = cha.channel[0].upper() try: sr = channel_sampling_rate[band_code] except KeyError: # Generic sampling rate for exotic band codes. sr = 1.0 for interval in cha.intervals: counter[interval.status] += 1 # Only take those time intervals that actually require # some downloading. if interval.status != STATUS.NEEDS_DOWNLOADING: continue chunks_curr.append(( sta.network, sta.station, cha.location, cha.channel, interval.start, interval.end, interval.filename)) # Assume that each sample needs 4 byte, STEIM # compression reduces size to about a third. # chunk size is in MB duration = interval.end - interval.start curr_chunks_mb += \ sr * duration * 4.0 / 3.0 / 1024.0 / 1024.0 if curr_chunks_mb >= chunk_size_in_mb or \ len(chunks_curr) >= max_chunk_length: chunks.append(chunks_curr) chunks_curr = [] curr_chunks_mb = 0 if chunks_curr: chunks.append(chunks_curr) keys = sorted(counter.keys()) for key in keys: self.logger.info( "Client '%s' - Status for %i time intervals/channels before " "downloading: %s" % (self.client_name, counter[key], key.upper())) if not chunks: return def star_download_mseed(args): """ Star maps the arguments to the utils.download_and_split_mseed_bulk() function. :param args: The arguments to be passed. """ try: ret_val = utils.download_and_split_mseed_bulk( *args, logger=self.logger) except utils.ERRORS as e: msg = ("Client '%s' - " % args[1]) + str(e) if "no data available" in msg.lower(): self.logger.info(msg.split("Detailed response")[0].strip()) else: self.logger.error(msg) return [] return ret_val pool = ThreadPool(min(threads_per_client, len(chunks))) d_start = timeit.default_timer() pool.map( star_download_mseed, [(self.client, self.client_name, chunk) for chunk in chunks]) pool.close() d_end = timeit.default_timer() self.logger.info("Client '%s' - Launching basic QC checks..." % self.client_name) downloaded_bytes, discarded_bytes = self._check_downloaded_data() total_bytes = downloaded_bytes + discarded_bytes self.logger.info("Client '%s' - Downloaded %.1f MB [%.2f KB/sec] of " "data, %.1f MB of which were discarded afterwards." % (self.client_name, total_bytes / 1024.0 ** 2, total_bytes / 1024.0 / (d_end - d_start), discarded_bytes / 1024.0 ** 2)) # Recount everything to be able to emit some nice statistics. counter = collections.Counter() for sta in self.stations.values(): for chan in sta.channels: for interval in chan.intervals: counter[interval.status] += 1 keys = sorted(counter.keys()) for key in keys: self.logger.info( "Client '%s' - Status for %i time intervals/channels after " "downloading: %s" % ( self.client_name, counter[key], key.upper())) self._remove_failed_and_ignored_stations()
[docs] def _remove_failed_and_ignored_stations(self): """ Removes all stations that have no time interval with either exists or downloaded status. """ to_be_removed_keys = [] for key, station in self.stations.items(): if station.has_existing_or_downloaded_time_intervals is True: continue to_be_removed_keys.append(key) for key in to_be_removed_keys: del self.stations[key]
[docs] def sanitize_downloads(self): """ Should be run after the MiniSEED and StationXML downloads finished. It will make sure that every MiniSEED file also has a corresponding StationXML file. """ for station in self.stations.values(): station.sanitize_downloads(logger=self.logger)
[docs] def _check_downloaded_data(self): """ Read the downloaded data, set the proper status flags and remove data that does not meet the QC criteria. It just checks the downloaded data for minimum length and gaps/overlaps. Returns the downloaded_bytes and the discarded_bytes. """ downloaded_bytes = 0 discarded_bytes = 0 for sta in self.stations.values(): for cha in sta.channels: for interval in cha.intervals: # The status of the interval should not have changed if # it did not require downloading in the first place. if interval.status != STATUS.NEEDS_DOWNLOADING: continue # If the file does not exist, mark the time interval as # download failed. if not os.path.exists(interval.filename): interval.status = STATUS.DOWNLOAD_FAILED continue size = os.path.getsize(interval.filename) if size == 0: self.logger.warning("Zero byte file '%s'. Will be " "deleted." % interval.filename) utils.safe_delete(interval.filename) interval.status = STATUS.DOWNLOAD_FAILED continue # Guard against faulty files. try: st = obspy.read(interval.filename, headonly=True) except Exception as e: self.logger.warning( "Could not read file '%s' due to: %s\n" "Will be discarded." % (interval.filename, str(e))) utils.safe_delete(interval.filename) discarded_bytes += size interval.status = STATUS.DOWNLOAD_FAILED continue # Valid files with no data. if len(st) == 0: self.logger.warning( "Empty file '%s'. Will be deleted." % interval.filename) utils.safe_delete(interval.filename) discarded_bytes += size interval.status = STATUS.DOWNLOAD_FAILED continue # If user did not want gappy files, remove them. if self.restrictions.reject_channels_with_gaps is True and\ len(st) > 1: self.logger.info( "File '%s' has %i traces and thus contains " "gaps or overlaps. Will be deleted." % ( interval.filename, len(st))) utils.safe_delete(interval.filename) discarded_bytes += size interval.status = STATUS.DOWNLOAD_REJECTED continue if self.restrictions.minimum_length: duration = sum([tr.stats.endtime - tr.stats.starttime for tr in st]) expected_min_duration = \ self.restrictions.minimum_length * \ (interval.end - interval.start) if duration < expected_min_duration: self.logger.info( "File '%s' has only %.2f seconds of data. " "%.2f are required. File will be deleted." % (interval.filename, duration, expected_min_duration)) utils.safe_delete(interval.filename) discarded_bytes += size interval.status = STATUS.DOWNLOAD_REJECTED continue downloaded_bytes += size interval.status = STATUS.DOWNLOADED return downloaded_bytes, discarded_bytes
[docs] def _parse_miniseed_filenames(self, filenames, restrictions): time_range = restrictions.minimum_length * (restrictions.endtime - restrictions.starttime) channel_availability = [] for filename in filenames: st = obspy.read(filename, format="MSEED", headonly=True) if restrictions.reject_channels_with_gaps and len(st) > 1: self.logger.warning("Channel %s has gap or overlap. Will be " "removed." % st[0].id) try: os.remove(filename) except OSError: pass continue elif len(st) == 0: self.logger.error("MiniSEED file with no data detected. " "Should not happen!") continue tr = st[0] duration = tr.stats.endtime - tr.stats.starttime if restrictions.minimum_length and duration < time_range: self.logger.warning("Channel %s does not satisfy the minimum " "length requirement. %.2f seconds instead " "of the required %.2f seconds." % ( tr.id, duration, time_range)) try: os.remove(filename) except OSError: pass continue channel_availability.append(utils.ChannelAvailability( tr.stats.network, tr.stats.station, tr.stats.location, tr.stats.channel, tr.stats.starttime, tr.stats.endtime, filename)) return channel_availability
[docs] def discard_stations(self, existing_client_dl_helpers): """ Discard all stations part of any of the already existing client download helper instances. The station discarding happens purely based on station ids. :param existing_client_dl_helpers: Instances of already existing client download helpers. All stations part of this will not be downloaded anymore. :type existing_client_dl_helpers: list of :class:`~.ClientDownloadHelper` """ station_ids = [] for helper in existing_client_dl_helpers: station_ids.extend(helper.stations.keys()) for station_id in station_ids: try: del self.stations[station_id] except KeyError: pass
[docs] def get_availability(self): """ Queries the current client for information on what stations are available given the spatial and temporal restrictions. """ # Check if stations needs to be filtered after downloading or if the # restrictions one can impose with the FDSN webservices queries are # enough. This depends on the domain definition. try: self.domain.is_in_domain(0, 0) needs_filtering = True except NotImplementedError: needs_filtering = False arguments = { "network": self.restrictions.network, "station": self.restrictions.station, "location": self.restrictions.location, "channel": self.restrictions.channel, "starttime": self.restrictions.starttime, "endtime": self.restrictions.endtime, # Request at the channel level. "level": "channel" } # Add the domain specific query parameters. arguments.update(self.domain.get_query_parameters()) # Check the capabilities of the service and see what is the most # appropriate way of acquiring availability information. Some services # right now require manual overriding of what they claim to be # capable of. if "matchtimeseries" in self.client.services["station"]: arguments["matchtimeseries"] = True if "format" in self.client.services["station"]: arguments["format"] = "text" self.is_availability_reliable = True else: if "format" in self.client.services["station"]: arguments["format"] = "text" self.is_availability_reliable = False if self.is_availability_reliable: self.logger.info("Client '%s' - Requesting reliable " "availability." % self.client_name) else: self.logger.info( "Client '%s' - Requesting unreliable availability." % self.client_name) try: start = time.time() inv = self.client.get_stations(**arguments) end = time.time() except utils.ERRORS as e: if "no data available" in str(e).lower(): self.logger.info( "Client '%s' - No data available for request." % self.client_name) return self.logger.error( "Client '{0}' - Failed getting availability: %s".format( self.client_name), str(e)) return # This sometimes fires if a service returns some random stuff which # is not a valid station file. except Exception as e: self.logger.error( "Client '{0}' - Failed getting availability due to " "unexpected exception: %s".format(self.client_name), str(e)) return self.logger.info("Client '%s' - Successfully requested availability " "(%.2f seconds)" % (self.client_name, end - start)) # Get the time intervals from the restrictions. intervals = [TimeInterval(start=_i[0], end=_i[1]) for _i in self.restrictions] for network in inv: # Skip network if so desired. skip_network = False for pattern in self.restrictions.exclude_networks: if fnmatch.fnmatch(network.code, pattern): skip_network = True break if skip_network: continue for station in network: # Skip station if so desired. skip_station = False for pattern in self.restrictions.exclude_stations: if fnmatch.fnmatch(station.code, pattern): skip_station = True break if skip_station: continue # If an inventory is given, only keep stations part of the # inventory. if self.restrictions.limit_stations_to_inventory is not None \ and (network.code, station.code) not in \ self.restrictions.limit_stations_to_inventory: continue # Skip the station if it is not in the desired domain. if needs_filtering is True and \ not self.domain.is_in_domain(station.latitude, station.longitude): continue channels = [] for channel in station.channels: # Remove channels that somehow slipped past the temporal # constraints due to weird behaviour from the data center. if (channel.start_date > self.restrictions.endtime) or \ (channel.end_date < self.restrictions.starttime): continue new_channel = Channel( location=channel.location_code, channel=channel.code, intervals=copy.deepcopy(intervals)) # Multiple channel epochs would result in duplicate # channels which we don't want. Bit of a silly logic here # to get rid of them. if new_channel not in channels: channels.append(new_channel) if self.restrictions.channel is None: # Group by locations and apply the channel priority filter # to each. filtered_channels = [] def get_loc(x): return x.location for location, _channels in itertools.groupby( sorted(channels, key=get_loc), get_loc): filtered_channels.extend(utils.filter_channel_priority( list(_channels), key="channel", priorities=self.restrictions.channel_priorities)) channels = filtered_channels if self.restrictions.location is None: # Filter to remove unwanted locations according to the # priority list. has_channels_before_filtering = bool(channels) channels = utils.filter_channel_priority( channels, key="location", priorities=self.restrictions.location_priorities) # This has been a point of confusion for users so raise a # warning in case this removed all channels and is still # using the default settings. if not channels and has_channels_before_filtering and \ self.restrictions._loc_prios_are_default_values: self.logger.warning( "Client '%s' - No channel at station %s.%s has " "been selected due to the `location_priorities` " "settings." % (self.client_name, network.code, station.code)) if not channels: continue self.stations[(network.code, station.code)] = Station( network=network.code, station=station.code, latitude=station.latitude, longitude=station.longitude, channels=channels) self.logger.info("Client '%s' - Found %i stations (%i channels)." % ( self.client_name, len(self.stations), sum([len(_i.channels) for _i in self.stations.values()])))
if __name__ == '__main__': import doctest doctest.testmod(exclude_empty=True)