Source code for obspy.clients.fdsn.mass_downloader.download_helpers

# -*- coding: utf-8 -*-
"""
Helpers for the mass downloader.

Intended to simplify and stabilize the logic of the mass downloader and make
it understandable in the first place.

:copyright:
    Lion Krischer (krischer@geophysik.uni-muenchen.de), 2014-2015
:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
import collections
import copy
import fnmatch
import itertools
import os
import time
import timeit
from itertools import filterfalse
from multiprocessing.pool import ThreadPool

import numpy as np
from lxml.etree import XMLSyntaxError

import obspy
from obspy.core.util import Enum

from . import utils

#: The current status of an entity.
STATUS = Enum(["none", "needs_downloading", "downloaded", "ignore", "exists",
               "download_failed", "download_rejected",
               "download_partially_failed"])


[docs] class _SlotsEqualityComparisionObject(object): """ Helper object with an equality comparision method simply comparing all slotted attributes. """ __slots__ = []
[docs] def __eq__(self, other): if type(self) is not type(other): return False return all([getattr(self, _i) == getattr(other, _i) for _i in self.__slots__])
[docs] class Station(_SlotsEqualityComparisionObject): """ Object representing a seismic station within the download helper classes. It knows the coordinates of the station to perform the filtering, its channels and the filename and status of the StationXML files. :param network: The network code. :type network: str :param station: The station code. :type station: str :param latitude: The latitude of the station. :type latitude: float :param longitude: The longitude of the station. :type longitude: float :param channels: The channels of the station. :type channels: list[:class:`~.Channel`] :param stationxml_filename: The filename of the StationXML file. :type stationxml_filename: str :param stationxml_status: The current status of the station. :type stationxml_status: :py:attr:`~.STATUS` """ __slots__ = ["network", "station", "latitude", "longitude", "channels", "_stationxml_filename", "want_station_information", "miss_station_information", "have_station_information", "stationxml_status"]
[docs] def __init__(self, network, station, latitude, longitude, channels, stationxml_filename=None, stationxml_status=None): # Station attributes. self.network = network self.station = station self.latitude = latitude self.longitude = longitude self.channels = channels # Station information settings. self.stationxml_filename = stationxml_filename self.stationxml_status = stationxml_status and STATUS.NONE # Internally keep track of which channels and time interval want # station information, which miss station information and which # already have some. want_station_information should always be the # union of miss and have. self.want_station_information = {} self.miss_station_information = {} self.have_station_information = {}
@property def has_existing_or_downloaded_time_intervals(self): """ Returns true if any of the station's time intervals have status "DOWNLOADED" or "EXISTS". Otherwise it returns False meaning it does not have to be considered anymore. """ status = set() for chan in self.channels: for ti in chan.intervals: status.add(ti.status) if STATUS.EXISTS in status or STATUS.DOWNLOADED in status: return True return False @property def has_existing_time_intervals(self): """ Returns True if any of the station's time intervals already exist. """ for chan in self.channels: for ti in chan.intervals: if ti.status == STATUS.EXISTS: return True return False
[docs] def remove_files(self, logger, reason): """ Delete all files under it. Only delete stuff that actually has been downloaded! """ for chan in self.channels: for ti in chan.intervals: if ti.status != STATUS.DOWNLOADED or not ti.filename: continue if os.path.exists(ti.filename): logger.info("Deleting MiniSEED file '%s'. Reason: %s" % ( ti.filename, reason)) utils.safe_delete(ti.filename) if self.stationxml_status == STATUS.DOWNLOADED and \ self.stationxml_filename and \ os.path.exists(self.stationxml_filename): logger.info("Deleting StationXMl file '%s'. Reason: %s" % (self.stationxml_filename, reason)) utils.safe_delete(self.stationxml_filename)
@property def stationxml_filename(self): return self._stationxml_filename @stationxml_filename.setter def stationxml_filename(self, value): """ Setter creating the directory for the file if it does not already exist. """ self._stationxml_filename = value if not value: return dirname = os.path.dirname(value) if not os.path.exists(dirname): os.makedirs(dirname) @property def temporal_bounds(self): """ Return the temporal bounds for the station. """ starttimes = [] endtimes = [] for channel in self.channels: s, e = channel.temporal_bounds starttimes.append(s) endtimes.append(e) return min(starttimes), max(endtimes)
[docs] def __str__(self): channels = "\n".join(str(i) for i in self.channels) channels = "\n\t".join(channels.splitlines()) return ( "Station '{network}.{station}' [Lat: {lat:.2f}, Lng: {lng:.2f}]\n" "\t-> Filename: {filename} ({status})\n" "\t-> Wants station information for channels: {want}\n" "\t-> Has station information for channels: {has}\n" "\t-> Misses station information for channels: {miss}\n" "\t{channels}" ).format( network=self.network, station=self.station, lat=self.latitude, lng=self.longitude, filename=self.stationxml_filename, status="exists" if (self.stationxml_filename and os.path.exists( self.stationxml_filename)) else "does not yet exist", want=", ".join(["%s.%s" % (_i[0], _i[1]) for _i in self.want_station_information.keys()]), has=", ".join(["%s.%s" % (_i[0], _i[1]) for _i in self.have_station_information.keys()]), miss=", ".join(["%s.%s" % (_i[0], _i[1]) for _i in self.miss_station_information.keys()]), channels=channels)
[docs] def prepare_stationxml_download(self, stationxml_storage, logger): """ Figure out what to download. :param stationxml_storage: """ # Determine what channels actually want to have station information. # This will be a tuple of location code, channel code, starttime, # and endtime. self.want_station_information = {} for channel in self.channels: if channel.needs_station_file is False: continue self.want_station_information[ (channel.location, channel.channel)] = channel.temporal_bounds # No channel has any data, thus nothing will happen. if not self.want_station_information: self.stationxml_status = STATUS.NONE return # Only those channels that now actually want station information # will be treated in the following. s, e = self.temporal_bounds storage = utils.get_stationxml_filename( stationxml_storage, self.network, self.station, list(self.want_station_information.keys()), starttime=s, endtime=e) # The simplest case. The function returns a string. Now two things # can happen. if isinstance(storage, (str, bytes)): filename = storage self.stationxml_filename = filename # 1. The file does not yet exist. Thus all channels must be # downloaded. if not os.path.exists(filename): self.miss_station_information = \ copy.deepcopy(self.want_station_information) self.have_station_information = {} self.stationxml_status = STATUS.NEEDS_DOWNLOADING return # 2. The file does exist. It will be parsed. If it contains ALL # necessary information, nothing will happen. Otherwise it will # be overwritten. else: info = utils.get_stationxml_contents(filename) for c_id, times in self.want_station_information.items(): # Get the temporal range of information in the file. c_info = [_i for _i in info if _i.network == self.network and _i.station == self.station and _i.location == c_id[0] and _i.channel == c_id[1]] if not c_info: break starttime = min([_i.starttime for _i in c_info]) endtime = max([_i.endtime for _i in c_info]) if starttime > times[0] or endtime < times[1]: break # All good if no break is called. else: self.have_station_information = \ copy.deepcopy(self.want_station_information) self.miss_station_information = {} self.stationxml_status = STATUS.EXISTS return # Otherwise everything will be downloaded. self.miss_station_information = \ copy.deepcopy(self.want_station_information) self.have_station_information = {} self.stationxml_status = STATUS.NEEDS_DOWNLOADING return # The other possibility is that a dictionary is returned. else: # The types are already checked by the get_stationxml_filename() # function. missing_channels = storage["missing_channels"] available_channels = storage["available_channels"] # Get the channels wanting station information and filter them. channels_wanting_station_information = copy.deepcopy( self.want_station_information ) # Figure out what channels are missing and will be downloaded. self.miss_station_information = {} for channel in missing_channels: if channel not in channels_wanting_station_information: continue self.miss_station_information[channel] = \ channels_wanting_station_information[channel] # Same thing but with the already available channels. self.have_station_information = {} for channel in available_channels: if channel not in channels_wanting_station_information: continue self.have_station_information[channel] = \ channels_wanting_station_information[channel] self.stationxml_filename = storage["filename"] # Raise a warning if something is missing, but do not raise an # exception or halt the program at this point. have_channels = set(self.have_station_information.keys()) miss_channels = set(self.miss_station_information.keys()) want_channels = set(self.want_station_information.keys()) if have_channels.union(miss_channels) != want_channels: logger.warning( "The custom `stationxml_storage` did not return " "information about channels %s" % str(want_channels.difference(have_channels.union( miss_channels)))) if self.miss_station_information: self.stationxml_status = STATUS.NEEDS_DOWNLOADING elif not self.miss_station_information and \ self.have_station_information: self.stationxml_status = STATUS.EXISTS else: self.stationxml_status = STATUS.IGNORE
[docs] def prepare_mseed_download(self, mseed_storage): """ Loop through all channels of the station and distribute filenames and the current status of the channel. A MiniSEED interval will be ignored, if the `mseed_storage` function returns `True`. Possible statuses after method execution are IGNORE, EXISTS, and NEEDS_DOWNLOADING. :param mseed_storage: """ for channel in self.channels: for interval in channel.intervals: interval.filename = utils.get_mseed_filename( mseed_storage, self.network, self.station, channel.location, channel.channel, interval.start, interval.end) if interval.filename is True: interval.status = STATUS.IGNORE elif os.path.exists(interval.filename): interval.status = STATUS.EXISTS else: if not os.path.exists(os.path.dirname(interval.filename)): os.makedirs(os.path.dirname(interval.filename)) interval.status = STATUS.NEEDS_DOWNLOADING
[docs] def sanitize_downloads(self, logger): """ Should be run after the MiniSEED and StationXML downloads finished. It will make sure that every MiniSEED file also has a corresponding StationXML file. It will delete MiniSEED files but never a StationXML file. The logic of the download helpers does not allow for a StationXML file with no data. """ from obspy.io.mseed.util import get_start_and_end_time # All or nothing for each channel. for id in self.miss_station_information.keys(): logger.warning("Station information could not be downloaded for " "%s.%s.%s.%s. MiniSEED files outside of the " "station information period " "will be deleted!" % ( self.network, self.station, id[0], id[1])) channel = [_i for _i in self.channels if (_i.location, _i.channel) == id][0] for time_interval in channel.intervals: # Check that file exists before proceeding if not time_interval.filename or \ not os.path.isfile(time_interval.filename): continue # Check that the time_interval.start and end are correct! time_interval.start, time_interval.end = \ get_start_and_end_time(time_interval.filename) # Only delete downloaded things! if time_interval.status == STATUS.DOWNLOADED: # Only delete if the station data are actually missing # for this time miss_start, miss_end = self.miss_station_information[id] if miss_start <= time_interval.start <= miss_end and \ miss_start <= time_interval.end <= miss_end: utils.safe_delete(time_interval.filename) time_interval.status = STATUS.DOWNLOAD_REJECTED
[docs] class Channel(_SlotsEqualityComparisionObject): """ Object representing a Channel. Each time interval should end up in one MiniSEED file. """ __slots__ = ["location", "channel", "intervals"]
[docs] def __init__(self, location, channel, intervals): self.location = location self.channel = channel self.intervals = intervals
@property def needs_station_file(self): """ Determine if the channel requires any station information. As soon as the status of at least one interval is either ``DOWNLOADED`` or ``EXISTS`` the whole channel will be thought of as requiring station information. This does not yet mean that station information will be downloaded. That is decided at a later stage. """ status = set([_i.status for _i in self.intervals]) if STATUS.DOWNLOADED in status or STATUS.EXISTS in status: return True return False @property def temporal_bounds(self): """ Returns a tuple of the minimum start time and the maximum end time. """ return (min([_i.start for _i in self.intervals]), max([_i.end for _i in self.intervals]))
[docs] def __str__(self): return "Channel '{location}.{channel}':\n\t{intervals}".format( location=self.location, channel=self.channel, intervals="\n\t".join([str(i) for i in self.intervals]))
[docs] class TimeInterval(_SlotsEqualityComparisionObject): """ Simple object representing a time interval of a channel. It knows the temporal bounds of the interval, the (desired) filename, and the current status of the interval. :param start: The start of the interval. :type start: :class:`~obspy.core.utcdatetime.UTCDateTime` :param end: The end of the interval. :type end: :class:`~obspy.core.utcdatetime.UTCDateTime` :param filename: The filename of the interval. :type filename: str :param status: The status of the time interval. :param status: :py:attr:`~.STATUS` """ __slots__ = ["start", "end", "filename", "status"]
[docs] def __init__(self, start, end, filename=None, status=None): self.start = start self.end = end self.filename = filename self.status = status if status is not None else STATUS.NONE
[docs] def __repr__(self): return "TimeInterval(start={start}, end={end}, filename={filename}, " \ "status='{status}')".format( start=repr(self.start), end=repr(self.end), filename="'%s'" % self.filename if self.filename is not None else "None", status=str(self.status))
[docs] class ClientDownloadHelper(object): """ :type client: :class:`obspy.clients.fdsn.client.Client` :param client: An initialized FDSN client. :type client_name: str :param client_name: The name of the client. Only used for logging. :type restrictions: :class:`~.restrictions.Restrictions` :param restrictions: The non-domain related restrictions for the query. :type domain: :class:`~.domain.Domain` subclass :param domain: The domain definition. :param mseed_storage: The MiniSEED storage settings. :param stationxml_storage: The StationXML storage settings. :param logger: An active logger instance. """
[docs] def __init__(self, client, client_name, restrictions, domain, mseed_storage, stationxml_storage, logger): self.client = client self.client_name = client_name self.restrictions = restrictions self.domain = domain self.mseed_storage = mseed_storage self.stationxml_storage = stationxml_storage self.logger = logger self.stations = {} self.is_availability_reliable = None
[docs] def __bool__(self): return bool(len(self))
[docs] def __str__(self): avail_map = { None: "Unknown reliability of availability information", True: "Reliable availability information", False: "Non-reliable availability information" } reliability = avail_map[self.is_availability_reliable] return ( "ClientDownloadHelper object for client '{client}' ({url})\n" "-> {reliability}\n" "-> Manages {station_count} stations.\n{stations}").format( client=self.client_name, url=self.client.base_url, reliability=reliability, station_count=len(self), stations="\n".join([str(_i) for _i in self.stations.values()]))
[docs] def __len__(self): return len(self.stations)
[docs] def prepare_mseed_download(self): """ Prepare each Station for the MiniSEED downloading stage. This will distribute filenames and identify files that require downloading. """ for station in self.stations.values(): station.prepare_mseed_download(mseed_storage=self.mseed_storage)
[docs] def filter_stations_based_on_minimum_distance( self, existing_client_dl_helpers): """ Removes stations until all stations have a certain minimum distance to each other. Returns the rejected stations which is mainly useful for testing. :param existing_client_dl_helpers: Instances of already existing client download helpers. :type existing_client_dl_helpers: list of :class:`~.ClientDownloadHelper` """ if not self.restrictions.minimum_interstation_distance_in_m: # No rejected stations. return [] # Create a sorted copy that will be used in the following. Make it # more deterministic by sorting the stations based on the id. stations = copy.copy(list(self.stations.values())) stations = sorted(stations, key=lambda x: (x.network, x.station)) existing_stations = [] for dlh in existing_client_dl_helpers: existing_stations.extend(list(dlh.stations.values())) remaining_stations = [] rejected_stations = [] # There are essentially two possibilities. If no station exists yet, # it will choose the largest subset of stations satisfying the # minimum inter-station distance constraint. if not existing_stations: # Build k-d-tree and query for the neighbours of each point within # the minimum distance. kd_tree = utils.SphericalNearestNeighbour(stations) nns = kd_tree.query_pairs( self.restrictions.minimum_interstation_distance_in_m) indexes_to_remove = [] # Keep removing the station with the most pairs until no pairs are # left. while nns: most_common = collections.Counter( itertools.chain.from_iterable(nns)).most_common()[0][0] indexes_to_remove.append(most_common) nns = list(filterfalse(lambda x: most_common in x, nns)) # Remove these indices this results in a set of stations we wish to # keep. new_remaining_stations = [_i[1] for _i in enumerate(stations) if _i[0] not in indexes_to_remove] new_rejected_stations = [_i[1] for _i in enumerate(stations) if _i[0] in indexes_to_remove] # Station objects are not hashable thus we have to go the long # route. for st in new_remaining_stations: if st not in remaining_stations: remaining_stations.append(st) for st in new_rejected_stations: if st not in rejected_stations: rejected_stations.append(st) # Otherwise it will add new stations approximating a Poisson disk # distribution. else: while stations: # kd-tree with all existing_stations existing_kd_tree = utils.SphericalNearestNeighbour( existing_stations) # Now we have to get the distance to the closest existing # station for all new stations. distances = np.ma.array(existing_kd_tree.query(stations)[0]) if np.isinf(distances[0]): break distances.mask = False # Step one is to get rid of all stations that are closer # than the minimum distance to any existing station. remove = np.where( distances < self.restrictions.minimum_interstation_distance_in_m)[0] rejected_stations.extend([stations[_i] for _i in remove]) keep = np.where( distances >= self.restrictions.minimum_interstation_distance_in_m)[0] distances.mask[remove] = True if len(keep): # Station with the largest distance to next closer station. largest = np.argmax(distances) remaining_stations.append(stations[largest]) existing_stations.append(stations[largest]) # Add all rejected stations here. stations = [stations[_i] for _i in keep if _i != largest] else: stations = [] # Now actually delete the files and everything of the rejected # stations. for station in rejected_stations: station.remove_files(logger=self.logger, reason="Minimum distance filtering.") self.stations = {} for station in remaining_stations: self.stations[(station.network, station.station)] = station # Return the rejected stations. return {(_i.network, _i.station): _i for _i in rejected_stations}
[docs] def prepare_stationxml_download(self): """ Prepare each Station for the StationXML downloading stage. This will distribute filenames and identify files that require downloading. """ for station in self.stations.values(): station.prepare_stationxml_download( stationxml_storage=self.stationxml_storage, logger=self.logger)
[docs] def download_stationxml(self, threads=3): """ Actually download the StationXML files. :param threads: Limits the maximum number of threads for the client. """ def star_download_station(args): """ Maps arguments to the utils.download_stationxml() function. :param args: The to-be mapped arguments. """ try: ret_val = utils.download_stationxml(*args, logger=self.logger) except utils.ERRORS as e: self.logger.error(str(e)) return None return ret_val # Build up everything we want to download. arguments = [] for station in self.stations.values(): if not station.miss_station_information: continue s, e = station.temporal_bounds if self.restrictions.station_starttime: s = self.restrictions.station_starttime if self.restrictions.station_endtime: e = self.restrictions.station_endtime bulk = [(station.network, station.station, channel.location, channel.channel, s, e) for channel in station.channels] arguments.append((self.client, self.client_name, bulk, station.stationxml_filename)) if not arguments: self.logger.info("Client '%s' - No station information to " "download." % self.client_name) return # Download it. s_time = timeit.default_timer() pool = ThreadPool(min(threads, len(arguments))) results = pool.map(star_download_station, arguments) pool.close() e_time = timeit.default_timer() results = [_i for _i in results if _i is not None] # Check it. filecount = 0 download_size = 0 # Update the station structures. Loop over each returned file. for s_id, filename in results: filecount += 1 station = self.stations[s_id] size = os.path.getsize(filename) download_size += size # Extract information about that file. try: info = utils.get_stationxml_contents(filename) # Sometimes some services choose to not return XML files - guard # against it and just delete the file. At subsequent runs the # mass downloader will attempt to download it again. except XMLSyntaxError: self.logger.info( "Client '%s' - File %s is not an XML file - it will be " "deleted." % (self.client_name, filename)) utils.safe_delete(filename) continue still_missing = {} # Make sure all missing information has been downloaded by # looping over each channel of the station that originally # requested to be downloaded. for c_id, times in station.miss_station_information.items(): # Get the temporal range of information in the file. c_info = [_i for _i in info if _i.network == station.network and _i.station == station.station and _i.location == c_id[0] and _i.channel == c_id[1]] if not c_info: continue starttime = min([_i.starttime for _i in c_info]) endtime = max([_i.endtime for _i in c_info]) if starttime > times[0] or endtime < times[1]: # Cope with case that not full day of station info missing if starttime < times[1]: still_missing[c_id] = (times[0], starttime) station.have_station_information[c_id] = (starttime, times[1]) elif endtime > times[0]: still_missing[c_id] = (endtime, times[1]) station.have_station_information[c_id] = (times[0], endtime) else: still_missing[c_id] = times continue station.have_station_information[c_id] = times station.miss_station_information = still_missing if still_missing: station.stationxml_status = STATUS.DOWNLOAD_PARTIALLY_FAILED else: station.stationxml_status = STATUS.DOWNLOADED # Now loop over all stations and set the status of the ones that # still need downloading to download failed. for station in self.stations.values(): if station.stationxml_status == STATUS.NEEDS_DOWNLOADING: station.stationxml_status = STATUS.DOWNLOAD_FAILED self.logger.info("Client '%s' - Downloaded %i station files [%.1f MB] " "in %.1f seconds [%.2f KB/sec]." % ( self.client_name, filecount, download_size / 1024.0 ** 2, e_time - s_time, (download_size / 1024.0) / (e_time - s_time)))
[docs] def download_mseed(self, chunk_size_in_mb=25, threads_per_client=3): """ Actually download MiniSEED data. :param chunk_size_in_mb: Attempt to download data in chunks of this size. :param threads_per_client: Threads to launch per client. 3 seems to be a value in agreement with some data centers. """ # Estimate the download size to have equally sized chunks. channel_sampling_rate = { "F": 5000, "G": 5000, "D": 1000, "C": 1000, "E": 250, "S": 80, "H": 250, "B": 80, "M": 10, "L": 1, "V": 0.1, "U": 0.01, "R": 0.001, "P": 0.0001, "T": 0.00001, "Q": 0.000001, "A": 5000, "O": 5000} # Split into chunks of about equal size in terms of filesize. chunks = [] chunks_curr = [] curr_chunks_mb = 0 # Don't request more than 50 chunks at once to not choke the servers. max_chunk_length = 50 counter = collections.Counter() # Keep track of attempted downloads. for sta in self.stations.values(): for cha in sta.channels: # The band code is used to estimate the sampling rate of the # data to be downloaded. band_code = cha.channel[0].upper() try: sr = channel_sampling_rate[band_code] except KeyError: # Generic sampling rate for exotic band codes. sr = 1.0 for interval in cha.intervals: counter[interval.status] += 1 # Only take those time intervals that actually require # some downloading. if interval.status != STATUS.NEEDS_DOWNLOADING: continue chunks_curr.append(( sta.network, sta.station, cha.location, cha.channel, interval.start, interval.end, interval.filename)) # Assume that each sample needs 4 byte, STEIM # compression reduces size to about a third. # chunk size is in MB duration = interval.end - interval.start curr_chunks_mb += \ sr * duration * 4.0 / 3.0 / 1024.0 / 1024.0 if curr_chunks_mb >= chunk_size_in_mb or \ len(chunks_curr) >= max_chunk_length: chunks.append(chunks_curr) chunks_curr = [] curr_chunks_mb = 0 if chunks_curr: chunks.append(chunks_curr) keys = sorted(counter.keys()) for key in keys: self.logger.info( "Client '%s' - Status for %i time intervals/channels before " "downloading: %s" % (self.client_name, counter[key], key.upper())) if not chunks: return def star_download_mseed(args): """ Star maps the arguments to the utils.download_and_split_mseed_bulk() function. :param args: The arguments to be passed. """ try: ret_val = utils.download_and_split_mseed_bulk( *args, logger=self.logger) except utils.ERRORS as e: msg = ("Client '%s' - " % args[1]) + str(e) if "no data available" in msg.lower(): self.logger.info(msg.split("Detailed response")[0].strip()) else: self.logger.error(msg) return [] return ret_val pool = ThreadPool(min(threads_per_client, len(chunks))) d_start = timeit.default_timer() pool.map( star_download_mseed, [(self.client, self.client_name, chunk) for chunk in chunks]) pool.close() d_end = timeit.default_timer() self.logger.info("Client '%s' - Launching basic QC checks..." % self.client_name) downloaded_bytes, discarded_bytes = self._check_downloaded_data() total_bytes = downloaded_bytes + discarded_bytes self.logger.info("Client '%s' - Downloaded %.1f MB [%.2f KB/sec] of " "data, %.1f MB of which were discarded afterwards." % (self.client_name, total_bytes / 1024.0 ** 2, total_bytes / 1024.0 / (d_end - d_start), discarded_bytes / 1024.0 ** 2)) # Recount everything to be able to emit some nice statistics. counter = collections.Counter() for sta in self.stations.values(): for chan in sta.channels: for interval in chan.intervals: counter[interval.status] += 1 keys = sorted(counter.keys()) for key in keys: self.logger.info( "Client '%s' - Status for %i time intervals/channels after " "downloading: %s" % ( self.client_name, counter[key], key.upper())) self._remove_failed_and_ignored_stations()
[docs] def _remove_failed_and_ignored_stations(self): """ Removes all stations that have no time interval with either exists or downloaded status. """ to_be_removed_keys = [] for key, station in self.stations.items(): if station.has_existing_or_downloaded_time_intervals is True: continue to_be_removed_keys.append(key) for key in to_be_removed_keys: del self.stations[key]
[docs] def sanitize_downloads(self): """ Should be run after the MiniSEED and StationXML downloads finished. It will make sure that every MiniSEED file also has a corresponding StationXML file. """ for station in self.stations.values(): station.sanitize_downloads(logger=self.logger)
[docs] def _check_downloaded_data(self): """ Read the downloaded data, set the proper status flags and remove data that does not meet the QC criteria. It just checks the downloaded data for minimum length and gaps/overlaps. Returns the downloaded_bytes and the discarded_bytes. """ downloaded_bytes = 0 discarded_bytes = 0 for sta in self.stations.values(): for cha in sta.channels: for interval in cha.intervals: # The status of the interval should not have changed if # it did not require downloading in the first place. if interval.status != STATUS.NEEDS_DOWNLOADING: continue # If the file does not exist, mark the time interval as # download failed. if not os.path.exists(interval.filename): interval.status = STATUS.DOWNLOAD_FAILED continue size = os.path.getsize(interval.filename) if size == 0: self.logger.warning("Zero byte file '%s'. Will be " "deleted." % interval.filename) utils.safe_delete(interval.filename) interval.status = STATUS.DOWNLOAD_FAILED continue # Guard against faulty files. try: st = obspy.read(interval.filename, headonly=True) except Exception as e: self.logger.warning( "Could not read file '%s' due to: %s\n" "Will be discarded." % (interval.filename, str(e))) utils.safe_delete(interval.filename) discarded_bytes += size interval.status = STATUS.DOWNLOAD_FAILED continue # Valid files with no data. if len(st) == 0: self.logger.warning( "Empty file '%s'. Will be deleted." % interval.filename) utils.safe_delete(interval.filename) discarded_bytes += size interval.status = STATUS.DOWNLOAD_FAILED continue # If user did not want gappy files, remove them. if self.restrictions.reject_channels_with_gaps is True and\ len(st) > 1: self.logger.info( "File '%s' has %i traces and thus contains " "gaps or overlaps. Will be deleted." % ( interval.filename, len(st))) utils.safe_delete(interval.filename) discarded_bytes += size interval.status = STATUS.DOWNLOAD_REJECTED continue if self.restrictions.minimum_length: duration = sum([tr.stats.endtime - tr.stats.starttime for tr in st]) expected_min_duration = \ self.restrictions.minimum_length * \ (interval.end - interval.start) if duration < expected_min_duration: self.logger.info( "File '%s' has only %.2f seconds of data. " "%.2f are required. File will be deleted." % (interval.filename, duration, expected_min_duration)) utils.safe_delete(interval.filename) discarded_bytes += size interval.status = STATUS.DOWNLOAD_REJECTED continue downloaded_bytes += size interval.status = STATUS.DOWNLOADED return downloaded_bytes, discarded_bytes
[docs] def _parse_miniseed_filenames(self, filenames, restrictions): time_range = restrictions.minimum_length * (restrictions.endtime - restrictions.starttime) channel_availability = [] for filename in filenames: st = obspy.read(filename, format="MSEED", headonly=True) if restrictions.reject_channels_with_gaps and len(st) > 1: self.logger.warning("Channel %s has gap or overlap. Will be " "removed." % st[0].id) try: os.remove(filename) except OSError: pass continue elif len(st) == 0: self.logger.error("MiniSEED file with no data detected. " "Should not happen!") continue tr = st[0] duration = tr.stats.endtime - tr.stats.starttime if restrictions.minimum_length and duration < time_range: self.logger.warning("Channel %s does not satisfy the minimum " "length requirement. %.2f seconds instead " "of the required %.2f seconds." % ( tr.id, duration, time_range)) try: os.remove(filename) except OSError: pass continue channel_availability.append(utils.ChannelAvailability( tr.stats.network, tr.stats.station, tr.stats.location, tr.stats.channel, tr.stats.starttime, tr.stats.endtime, filename)) return channel_availability
[docs] def discard_stations(self, existing_client_dl_helpers): """ Discard all stations part of any of the already existing client download helper instances. The station discarding happens purely based on station ids. :param existing_client_dl_helpers: Instances of already existing client download helpers. All stations part of this will not be downloaded anymore. :type existing_client_dl_helpers: list of :class:`~.ClientDownloadHelper` """ station_ids = [] for helper in existing_client_dl_helpers: station_ids.extend(helper.stations.keys()) for station_id in station_ids: try: del self.stations[station_id] except KeyError: pass
[docs] def get_availability(self): """ Queries the current client for information on what stations are available given the spatial and temporal restrictions. """ # Check if stations needs to be filtered after downloading or if the # restrictions one can impose with the FDSN webservices queries are # enough. This depends on the domain definition. try: self.domain.is_in_domain(0, 0) needs_filtering = True except NotImplementedError: needs_filtering = False arguments = { "network": self.restrictions.network, "station": self.restrictions.station, "location": self.restrictions.location, "channel": self.restrictions.channel, "starttime": self.restrictions.starttime, "endtime": self.restrictions.endtime, # Request at the channel level. "level": "channel" } # Add the domain specific query parameters. arguments.update(self.domain.get_query_parameters()) # Check the capabilities of the service and see what is the most # appropriate way of acquiring availability information. Some services # right now require manual overriding of what they claim to be # capable of. if "matchtimeseries" in self.client.services["station"]: arguments["matchtimeseries"] = True if "format" in self.client.services["station"]: arguments["format"] = "text" self.is_availability_reliable = True else: if "format" in self.client.services["station"]: arguments["format"] = "text" self.is_availability_reliable = False if self.is_availability_reliable: self.logger.info("Client '%s' - Requesting reliable " "availability." % self.client_name) else: self.logger.info( "Client '%s' - Requesting unreliable availability." % self.client_name) try: start = time.time() inv = self.client.get_stations(**arguments) end = time.time() except utils.ERRORS as e: if "no data available" in str(e).lower(): self.logger.info( "Client '%s' - No data available for request." % self.client_name) return self.logger.error( "Client '{0}' - Failed getting availability: %s".format( self.client_name), str(e)) return # This sometimes fires if a service returns some random stuff which # is not a valid station file. except Exception as e: self.logger.error( "Client '{0}' - Failed getting availability due to " "unexpected exception: %s".format(self.client_name), str(e)) return self.logger.info("Client '%s' - Successfully requested availability " "(%.2f seconds)" % (self.client_name, end - start)) # Get the time intervals from the restrictions. intervals = [TimeInterval(start=_i[0], end=_i[1]) for _i in self.restrictions] for network in inv: # Skip network if so desired. skip_network = False for pattern in self.restrictions.exclude_networks: if fnmatch.fnmatch(network.code, pattern): skip_network = True break if skip_network: continue for station in network: # Skip station if so desired. skip_station = False for pattern in self.restrictions.exclude_stations: if fnmatch.fnmatch(station.code, pattern): skip_station = True break if skip_station: continue # If an inventory is given, only keep stations part of the # inventory. if self.restrictions.limit_stations_to_inventory is not None \ and (network.code, station.code) not in \ self.restrictions.limit_stations_to_inventory: continue # Skip the station if it is not in the desired domain. if needs_filtering is True and \ not self.domain.is_in_domain(station.latitude, station.longitude): continue channels = [] for channel in station.channels: # Remove channels that somehow slipped past the temporal # constraints due to weird behaviour from the data center. if (channel.start_date > self.restrictions.endtime) or \ (channel.end_date < self.restrictions.starttime): continue new_channel = Channel( location=channel.location_code, channel=channel.code, intervals=copy.deepcopy(intervals)) # Multiple channel epochs would result in duplicate # channels which we don't want. Bit of a silly logic here # to get rid of them. if new_channel not in channels: channels.append(new_channel) if self.restrictions.channel is None: # Group by locations and apply the channel priority filter # to each. filtered_channels = [] def get_loc(x): return x.location for location, _channels in itertools.groupby( sorted(channels, key=get_loc), get_loc): filtered_channels.extend(utils.filter_channel_priority( list(_channels), key="channel", priorities=self.restrictions.channel_priorities)) channels = filtered_channels if self.restrictions.location is None: # Filter to remove unwanted locations according to the # priority list. has_channels_before_filtering = bool(channels) channels = utils.filter_channel_priority( channels, key="location", priorities=self.restrictions.location_priorities) # This has been a point of confusion for users so raise a # warning in case this removed all channels and is still # using the default settings. if not channels and has_channels_before_filtering and \ self.restrictions._loc_prios_are_default_values: self.logger.warning( "Client '%s' - No channel at station %s.%s has " "been selected due to the `location_priorities` " "settings." % (self.client_name, network.code, station.code)) if not channels: continue self.stations[(network.code, station.code)] = Station( network=network.code, station=station.code, latitude=station.latitude, longitude=station.longitude, channels=channels) self.logger.info("Client '%s' - Found %i stations (%i channels)." % ( self.client_name, len(self.stations), sum([len(_i.channels) for _i in self.stations.values()])))