Source code for mousestyles.intervals

""" Finite Union of Intervals

Implements a class to handle finite (disjoint) unions of intervals.

* assumes that intervals are always closed and that the union is disjoint
* open intervals remaining at the end of any operations (eg. complement)
* are always made closed.  e.g. [0,1]^C = [-np.inf,0] [1,np.inf]
* end intervals being unbounded is handled using -np.inf and np.inf
* does some okay handling for point intervals [a,a]

Darren Rhea, 2012; Chris Hillar revised, April 30, 2013;
Ram Mehta revised, 2013; Copyright (c) 2013, All rights reserved;
Chris Hillar revised, 2015
"""
from __future__ import (absolute_import, division,
                        print_function, unicode_literals)

import numpy as np


[docs]class Intervals(object): """ Finite Union of Intervals [ai,bi] backed by sorted lists. parameters intervals: (M x 2) numpy np.double array """ def __init__(self, intervals=None): if intervals is None or len(intervals) == 0: self.intervals = np.array([]) return intervals = np.atleast_2d(np.asarray(intervals)) idx = intervals[:, 0].argsort() self.intervals = intervals[idx, :] if not self._is_disjoint(): self._make_disjoint() def __iter__(self): return iter(self.intervals) def __len__(self): return self.measure() def __str__(self): if self.is_empty(): return "EmptySet" ivt = self.intervals return " ".join(["[%s,%s]" % (ivt[i, 0], ivt[i, 1]) for i in range(ivt.shape[0])]) def __add__(self, F): return self.union(F) def __mul__(self, F): return self.intersect(F) def __sub__(self, F): return self.remove(F) def __invert__(self): return self.complement() def _is_disjoint(self): """ Checks if intervals are indeed disjoint """ self.intervals = np.atleast_2d(self.intervals) if (self.intervals is None) or (self.intervals.shape[0] < 2): return True return ((self.intervals[:-1, 1] < self.intervals[1:, 0]).all() and (self.intervals[:, 1] >= self.intervals[:, 0]).all()) def _make_disjoint(self): """ Remove intervals [a, b] with a > b """ good_intervals_idx = self.intervals[:, 1] >= self.intervals[:, 0] self.intervals = self.intervals[good_intervals_idx, :] # union together intervals that overlap tot = None curr_idx = 0 curr_lhs = self.intervals[curr_idx, 0] curr_rhs = self.intervals[curr_idx, 1] while curr_idx < self.intervals.shape[0]: while curr_rhs >= self.intervals[curr_idx, 0]: curr_rhs = max(curr_rhs, self.intervals[curr_idx, 1]) curr_idx += 1 if curr_idx >= self.intervals.shape[0]: break if tot is None: tot = np.array([curr_lhs, curr_rhs]) else: tot = np.vstack((tot, np.array([curr_lhs, curr_rhs]))) if curr_idx < self.intervals.shape[0]: curr_lhs = self.intervals[curr_idx, 0] curr_rhs = self.intervals[curr_idx, 1] self.intervals = np.atleast_2d(tot)
[docs] def copy(self): return Intervals(self.intervals.copy())
[docs] def contains(self, x): """ Check if x is in the Finite Union of Intervals. """ if self.is_empty(): return False idx = self.intervals[:, 0].searchsorted(x) if idx == 0: return x >= self.intervals[idx, 0] if idx >= self.intervals.shape[0]: return x <= self.intervals[idx - 1, 1] if (self.intervals[idx - 1, 0] <= x) and \ (self.intervals[idx - 1, 1] >= x): return True if (self.intervals[idx, 0] <= x) and (self.intervals[idx, 1] >= x): return True return False
[docs] def index_of_first_intersection(self, x, find_nearest=False): """ finds interval nearest to given number x and containing x if find_nearest=False: doesn't require x to be in the interval """ if self.is_empty(): return -1 idx = self.intervals[:, 0].searchsorted(x) if idx == 0: if x >= self.intervals[idx, 0]: return idx else: return -1 if find_nearest: return idx if idx >= self.intervals.shape[0]: if x <= self.intervals[idx - 1, 1]: return -1 else: return idx - 1 if (self.intervals[idx - 1, 0] <= x) and \ (self.intervals[idx - 1, 1] >= x): return idx - 1 if (self.intervals[idx, 0] <= x) and (self.intervals[idx, 1] >= x): return idx - 1 return -1
[docs] def is_empty(self): return self.intervals.shape[0] == 0
[docs] def num(self): if self.is_empty(): return 0 return self.intervals.shape[0]
[docs] def union(self, F): """ New Intervals object which is the union of self and Intervals F. """ if F.is_empty(): return self if self.is_empty(): return F return Intervals(np.vstack((self.intervals, F.intervals)))
[docs] def intersect(self, F): """ New Intervals object which is the intersection of self and Intervals F. """ if F.is_empty(): return F if self.is_empty(): return self return ~(~self + ~F)
[docs] def intersect_with_interval(self, a, b): """ returns (not a copy) Intervals object which is the intersection of self and [a, b] (faster than intersect) """ if self.is_empty(): return self if (self.intervals[:, 1] > a).sum() == 0 or \ (self.intervals[:, 0] < b).sum() == 0: return Intervals() idx_first_gta = (self.intervals[:, 1] > a).nonzero()[0][0] idx_last_ltb = self.intervals.shape[ 0] - (self.intervals[:, 0] < b)[::-1].nonzero()[0][0] return Intervals(self.intervals[idx_first_gta:idx_last_ltb, :])
[docs] def complement(self): """ New Intervals object which is the complement of self. """ if self.is_empty(): return Intervals([-np.inf, np.inf]) M = self.intervals.shape[0] # complement bulk # 2 * M - 2 points list_endpoints = self.intervals.ravel() I = np.zeros((M - 1, 2)) odds = range(1, (M - 1) * 2, 2) evens = range(0, (M - 1) * 2, 2) I[:, 1] = list_endpoints[1: -1][odds] I[:, 0] = list_endpoints[1: -1][evens] # fix complement ends a, b = list_endpoints[0], list_endpoints[-1] if a > -np.inf: I = np.vstack((np.array([-np.inf, a]), I)) if b < np.inf: I = np.vstack((I, np.array([b, np.inf]))) return Intervals() if I.shape[0] == 0 else Intervals(I)
[docs] def measure(self): if self.is_empty(): return 0 diff_arr = self.intervals[:, 1] - self.intervals[:, 0] return diff_arr.sum()
[docs] def trim(self, eps=0.001): """ Removes intervals with lengths <= eps. """ if self.is_empty(): return self diff_arr = self.intervals[:, 1] - self.intervals[:, 0] idx = diff_arr > eps self.intervals = self.intervals[idx, :] return self
[docs] def connect_gaps(self, eps=0.001): """ connects consecutive intervals separated by lengths <= eps """ H = ~self diff_arr = H.intervals[:, 1] - H.intervals[:, 0] idx = diff_arr <= eps if idx.sum() == 0: return self B = Intervals(H.intervals[idx, :]) A = self.union(B) self.intervals = A.intervals return self
[docs] def connect_gaps_by_rule(self, rule): """ Returns a new object with gaps connected when rule returns True. Parameters rule: Callable that takes parameters start_time and end_time. """ if self.is_empty(): return self new_intervals = [] i = 0 dim = self.intervals.shape[0] while i < dim: a = self.intervals[i, 0] while i + 1 < dim and rule(self.intervals[i, 1], self.intervals[i + 1, 0]): i += 1 b = self.intervals[i, 1] new_intervals.append([a, b]) i += 1 return Intervals(np.array(new_intervals))
[docs] def remove(self, other): return self.intersect(~other)
[docs] def symmetric_difference(self, other): return (self - other) + (other - self)
[docs] def subordinate_to_array(self, arr): """ returns a new Intervals object with only intervals containing elements of arr (NOTE: arr is assumed sorted) """ arr = np.array(arr) F = Intervals() for interval in self.intervals: a, b = interval[0], interval[1] idxa = arr.searchsorted(a) idxb = arr.searchsorted(b) # arr has a point in interval if idxa != idxb or (idxa < len(arr) and a == arr[idxa]): F += Intervals([a, b]) return F
[docs] def save(self, filename='Intervals_save'): np.savez(filename, intervals=self.intervals)
[docs] def load(self, filename='Intervals_save.npz'): import os if os.path.exists(filename): arr = np.load(filename) self.intervals = arr['intervals'] return self
[docs] def ASs(self, ISDT=20): """ returns new object of Active States given self as Events """ return self.complement().trim(ISDT).complement()
[docs] def ISs(self, ISDT=20): """ returns new object of Inactive States given self as Events """ return self.complement().trim(ISDT)
[docs]def intervals_from_binary(bin_array, times): """ Given a one dimensional bin_array of 0s and 1s, returns a Intervals object of times corresponding to consecutives 1s """ F = Intervals() idx = 0 while idx < bin_array.shape[0]: curr_bit = bin_array[idx] if curr_bit == 0: idx += 1 else: AS_idx = idx + 1 start_idx = idx while AS_idx < bin_array.shape[0] and bin_array[AS_idx] == 1: AS_idx += 1 F = F.union(Intervals([times[start_idx], times[AS_idx - 1]])) idx = AS_idx return F
[docs]def binary_from_intervals(intervals, length=None): """ From an intervals object produce a binary sequence of size length """ if length is None: length = int(intervals.intervals[-1, 1] - intervals.intervals[0, 0]) binary = np.zeros(length) start = intervals.intervals[0, 0] end = intervals.intervals[-1, 1] arr = np.linspace(start, end, length) for c, time in enumerate(arr): if intervals.contains(time): binary[c] = 1 return binary
[docs]def timestamps_to_interval(array, eps=.01): """ given a 1D array with event timestamps, returns an interval centered on timestamp and eps wide. default 0.01 is half of minimum HCM sampling rate """ new_arr = zip(array - eps, array + eps) new_I = Intervals(new_arr) return new_I