Source code for moten.pyramids

'''
'''
#
# Adapted from MATLAB code written by S. Nishimoto (see Nishimoto, et al., 2011).
# Anwar O. Nunez-Elizalde (Jan, 2016)
#
# Updates:
#  Anwar O. Nunez-Elizalde (Apr, 2020)
#
#
# Implementation notes:
#
# The terminology here is "pyramid" and "filters".
# In moten.core, the termilogy is inconsistent.
#
# TODO: Fix terminology in moten.core to match API.
#
import numpy as np


from moten import (utils,
                   core,
                   viz,
                   )

__all__ = ['MotionEnergyPyramid',
           'StimulusMotionEnergy',
           'StimulusStaticGaborPyramid',
           'DefaultPyramids',
           ]

##############################
#
##############################

[docs]class MotionEnergyPyramid(object): '''Construct a motion energy pyramid that tiles the stimulus. Generates motion energy filters at the desired spatio-temporal frequencies and directions of motion. Multiple motion energy filters per spatio-temporal frequency are constructed and each is centered at different locations in the image in order to tile the stimulus. Parameters ---------- stimulus_vhsize : tuple of ints Size of the stimulus in pixels (vdim, hdim) stimulus_fps : scalar, [Hz] Stimulus playback speed in frames per second. spatial_frequencies : array-like, [cycles-per-image] Spatial frequencies for the filters spatial_directions : array-like, [degrees] Direction of filter motion. Degree position corresponds to standard unit-circle coordinates (i.e. 0=right, 180=left). temporal_frequencies : array-like, [Hz] Temporal frequencies of the filters filter_temporal_width : int Temporal window of the motion energy filter (e.g. 10). Defaults to approximately 0.666[secs] (`floor(stimulus_fps*(2/3))`). Attributes ---------- nfilters : int filters : list of dicts Each item is a dict defining a motion energy filter. Each of the `nfilters` has the following parameters: * centerv,centerh : y:vertical and x:horizontal position ('0,0' is top left) * direction : direction of motion [degrees] * spatial_freq : spatial frequency [cpi] * spatial_env : spatial envelope (gaussian s.d.) * temporal_freq : temporal frequency [Hz] * temporal_env : temporal envelope (gaussian s.d.) * filter_temporal_width : temporal window of filter [frames] * aspect_ratio : width/height * stimulus_fps : stimulus playback speed in frames per second * spatial_phase_offset : filter phase offset in [degrees] parameters : dict of arrays The individual parameter values across all filters. Each item is an array of length ``nfilters`` definition : dict Parameters used to define the pyramid. parameters_matrix : np.array, (nfilters, 11) Parameters that defined the motion energy filter. parameters_names : tuple of strings Notes ----- See :func:`moten.core.mk_moten_pyramid_params` for more details on making the pyramid. ''' def __init__(self, stimulus_vhsize=(576, 1024), stimulus_fps=24, temporal_frequencies=[0,2,4], spatial_frequencies=[0,2,4,8,16], spatial_directions=[0,45,90,135,180,225,270,315], sf_gauss_ratio=0.6, max_spatial_env=0.3, filter_spacing=3.5, tf_gauss_ratio=10., max_temp_env=0.3, include_edges=False, spatial_phase_offset=0.0, filter_temporal_width='auto'): '''...More parameters Parameters ---------- sf_gauss_ratio : scalar The ratio of spatial frequency to gaussian s.d. This controls the number of cycles in a filter max_spatial_env : scalar Defines the maximum s.d. of the gaussian gabor_spacing : scalar Defines the spacing between spatial gabors (in s.d. units) tf_gauss_ratio : scalar The ratio of temporal frequency to gaussian s.d. This controls the number of temporal cycles max_temp_env : scalar Defines the maximum s.d. of the temporal gaussian include_edges : bool Determines whether to include filters at the edge of the image which might be partially outside the stimulus field-of-view ''' vdim, hdim = stimulus_vhsize aspect_ratio = hdim/float(vdim) # same as stimulus if filter_temporal_width == 'auto': # default to 2/3 stimulus frame rate filter_temporal_width = int(stimulus_fps*(2/3.)) stimulus_vht_fov = (vdim, hdim, stimulus_fps) filter_vht_fov = (vdim, hdim, filter_temporal_width) # store pyramid parameters definition = utils.DotDict() for local_name, local_values in sorted(locals().items()): if local_name in ('self', 'definition') : continue definition[local_name] = local_values # sanity checks assert isinstance(stimulus_fps, int) assert isinstance(filter_temporal_width, int) # construct the gabor pyramid parameter_names, filter_params_array = core.mk_moten_pyramid_params( stimulus_fps, filter_temporal_width=filter_temporal_width, aspect_ratio=aspect_ratio, temporal_frequencies=temporal_frequencies, spatial_frequencies=spatial_frequencies, spatial_directions=spatial_directions, sf_gauss_ratio=sf_gauss_ratio, max_spatial_env=max_spatial_env, gabor_spacing=filter_spacing, tf_gauss_ratio=tf_gauss_ratio, max_temp_env=max_temp_env, include_edges=include_edges, spatial_phase_offset=spatial_phase_offset, ) nfilters = filter_params_array.shape[0] # make individual filters readable filters = [utils.DotDict({name : filter_params_array[idx, pdx] for pdx, name \ in enumerate(parameter_names)}) \ for idx in range(nfilters)] # parameters across filters parameters = utils.DotDict({name : filter_params_array[:,idx] for idx, name \ in enumerate(parameter_names)}) # store pyramid information as attributes self.filters = filters self.nfilters = nfilters self.parameters = parameters self.definition = definition self.parameters_matrix = filter_params_array self.parameters_names = parameter_names self.mask_threshold = 0.001 # HARDCODED. TODO: Make parameter. def __repr__(self): info = '<%s.%s [#%i filters (ntfq=%i, nsfq=%i, ndirs=%i) aspect=%0.03f]>' details = (__name__, type(self).__name__, self.nfilters, len(self.definition.temporal_frequencies), len(self.definition.spatial_frequencies), len(self.definition.spatial_directions), self.definition.aspect_ratio) return info%details
[docs] def filters_at_vhposition(self, centerv, centerh): '''Center spatio-temporal filters to requested vh-position. Parameters ---------- centerv : scalar Vertical filter position from top of frame (min=0, max=1.0). centerh : scalar Horizontal filter position from left of frame (min=0, max=aspect_ratio). Returns ------- centered_filters : list of dicts Spatio-temporal filter parameters at vh-position. ''' unique_parameters = np.unique(self.parameters_matrix[:, 2:], axis=0) nunique = unique_parameters.shape[0] new_filters = [utils.DotDict({name : unique_parameters[idx, pdx] for pdx, name \ in enumerate(self.parameters_names[2:])}) \ for idx in range(nunique)] for filt in new_filters: filt['centerh'] = centerh filt['centerv'] = centerv return new_filters
[docs] def get_filter_spatiotemporal_quadratures(self, filterid=0): '''Generate the spatial and temporal arrays that define the motion energy filter. Parameters ---------- filterid : int, or dict If int, the filter index. If dict, a filter dictionary definition Returns ------- spatial_gabor_sin : 2D np.ndarray, (vdim, hdim) spatial_gabor_cos : 2D np.ndarray, (vdim, hdim) Spatial gabor quadrature pair. ``spatial_gabor_cos`` has a 90 degree phase offset relative to ``spatial_gabor_sin`` temporal_gabor_sin : 1D np.ndarray, (`filter_temporal_width`,) temporal_gabor_cos : 1D np.ndarray, (`filter_temporal_width`,) Temporal gabor quadrature pair. ``temporal_gabor_cos`` has a 90 degree phase offset relative to ``temporal_gabor_sin`` ''' vhsize = (self.definition.vdim, self.definition.hdim) # extract parameters for this filterid if isinstance(filterid, dict): gabor_parameters_dict = filterid.copy() else: gabor_parameters_dict = self.filters[filterid] sgabor0, sgabor90, tgabor0, tgabor90 = core.mk_3d_gabor(vhsize, **gabor_parameters_dict) return sgabor0, sgabor90, tgabor0, tgabor90
[docs] def get_filter_temporal_quadrature(self, filterid=0): '''Generate the temporal arrays that define the motion energy filter. Parameters ---------- filterid : int, or dict If int, the filter index. If dict, a filter dictionary definition Returns ------- temporal_gabor_sin : 1D np.ndarray, (`filter_temporal_width`,) temporal_gabor_cos : 1D np.ndarray, (`filter_temporal_width`,) Temporal gabor quadrature pair. ``temporal_gabor_cos`` has a 90 degree phase offset relative to ``temporal_gabor_sin`` ''' _,_, tgabor0, tgabor90 = self.get_filter_spatiotemporal_quadratures(filterid=filterid) return tgabor0, tgabor90
[docs] def get_filter_spatial_quadrature(self, filterid=0): '''Generate the spatial arrays that define the motion energy filter. Parameters ---------- filterid : int, or dict If int, the filter index. If dict, a filter dictionary definition Returns ------- spatial_gabor_sin : 2D np.ndarray, (vdim, hdim) spatial_gabor_cos : 2D np.ndarray, (vdim, hdim) Spatial gabor quadrature pair. ``spatial_gabor_cos`` has a 90 degree phase offset relative to ``spatial_gabor_sin`` ''' sgabor0, sgabor90, _, _ = self.get_filter_spatiotemporal_quadratures(filterid=filterid) return sgabor0, sgabor90
[docs] def get_filter_mask(self, filterid=0): '''Generate a mask of the filter in the image Parameters ---------- filterid : int, or dict If int, the filter index. If dict, a filter dictionary definition Returns ------- mask : 2D np.ndarray, (vdim, hdim) Filter spatial mask ''' abs = np.abs threshold = self.mask_threshold spsin, spcos = self.get_filter_spatial_quadrature(filterid) mask = (abs(spsin) + abs(spcos)) > threshold return mask
[docs] def get_filter_pixel_sizes(self): '''Measure the size of the each filter in the pyramid in pixels. Returns ------- filter_npixels : 2D np.ndarray, (nfilters,) Array containing the size of each filter in pixels. ''' npixels = [] for filter_idx in range(self.nfilters): mask = self.get_filter_mask(filter_idx) npixels.append(mask.sum()) return npixels
[docs] def show_filter(self, filterid=0, speed=1.0, background=None): '''Display the motion energy filter as an animation. Parameters ---------- filterid : int, or dict If int, it's the index of the filter to display. If dict, it's a motion energy filter definition. Returns ------- animation : matplotlib.animation Examples -------- >>> import moten >>> pyramid = moten.get_default_pyramid() >>> _ = pyramid.show_filter(12) >>> custom_filter = {'centerh': 0.8888888888888888, 'centerv': 0.5, 'direction': 45.0, 'spatial_freq': 2.0, 'spatial_env': 0.3, 'temporal_freq': 2.0, 'temporal_env': 0.3, 'filter_temporal_width': 16.0, 'aspect_ratio': 1.7777777777777777, 'stimulus_fps': 24.0, 'spatial_phase_offset': 0.0} >>> _ = pyramid.show_filter(custom_filter) ''' # Get dimensions of movie vdim, hdim, stimulus_fps = self.definition.stimulus_vht_fov aspect_ratio = self.definition.aspect_ratio filter_width = self.definition.filter_temporal_width vhsize = (vdim, hdim) # extract parameters for this filterid if isinstance(filterid, dict): gabor_params_dict = filterid.copy() filterid = 0 else: gabor_params_dict = self.filters[filterid] # construct title from parameters title = '' for pdx, (pname, pval) in enumerate(sorted(gabor_params_dict.items())): title += '%s=%0.02f, '%(pname, pval) if np.mod(pdx, 3) == 0 and pdx > 0: title += '\n' return viz.plot_3dgabor(vhsize, gabor_params_dict, background=background, title=title, speed=speed)
[docs] def project_stimulus(self, stimulus, filters='all', quadrature_combination=utils.sqrt_sum_squares, output_nonlinearity=utils.log_compress, dtype='float32', use_cuda=False): '''Compute the motion energy filter responses to the stimulus. Parameters ---------- stimulus : np.ndarray, (nimages, vdim, hdim) or (nimages, npixels) The movie frames. If ``stimulus`` is two-dimensional with shape (nimages, npixels), then ``vhsize=(vdim,hdim)`` is required and `npixels == vdim*hdim`. filters : optional, 'all' or list of dicts By default compute the responses for all filters. Otherwise, provide a list of filter definitions to use. quadrature_combination : function, optional Specifies how to combine the channel reponse quadratures. The function must take the sin and cos as arguments in that order. Defaults to: :math:`(sin^2 + cos^2)^{1/2}` output_nonlinearity : function, optional Passes the channels (after ``quadrature_combination``) through a non-linearity. The function input is the (`nimages, nfilters`) array. Defaults to: :math:`log(x + 1e-05)` Returns ------- filter_responses : np.ndarray, (nimages, nfilters) ''' if filters == 'all': filters = self.filters vdim, hdim = self.definition.stimulus_vhsize output = core.project_stimulus(stimulus, filters, quadrature_combination=quadrature_combination, output_nonlinearity=output_nonlinearity, vhsize=(vdim, hdim), dtype=dtype) return output
[docs] def raw_project_stimulus(self, stimulus, filters='all', dtype='float32'): '''Obtain responses to the stimulus from all filter quadrature-pairs. Parameters ---------- stimulus : np.array, (nimages, vdim, hdim) The movie frames. filters : optional, 'all' or list of dicts By default compute the responses for all filters. Otherwise, provide a list of filter definitions to use. Returns ------- output_sin : np.ndarray, (nimages, nfilters) output_cos : np.ndarray, (nimages, nfilters) ''' if filters == 'all': filters = self.filters vdim, hdim = self.definition.stimulus_vhsize output_sin, output_cos = core.raw_project_stimulus(stimulus, filters, vhsize=(vdim, hdim), dtype=dtype) return output_sin, output_cos
[docs]class StimulusMotionEnergy(object): '''Parametrize a motion energy pyramid that tiles the stimulus. Generates motion energy filters at the desired spatio-temporal frequencies and directions of motion. Multiple motion energy filters per spatio-temporal frequency are constructed and each is centered at different locations in the image in order to tile the stimulus. Parameters ---------- stimulus : np.array, (nimages, vdim, hdim) The movie frames. stimulus_fps : scalar, [Hz] Stimulus playback speed in frames per second. spatial_frequencies : array-like, [cycles-per-image] Spatial frequencies for the filters spatial_directions : array-like, [degrees] Direction of filter motion. Degree position corresponds to standard unit-circle coordinates (i.e. 0=right, 180=left). temporal_frequencies : array-like, [Hz] Temporal frequencies of the filters filter_temporal_width : int Temporal window of the motion energy filter (e.g. 10). Defaults to approximately 0.666[secs] (`floor(stimulus_fps*(2/3))`). Attributes ---------- stimulus : 2D np.ndarray, (nimages, vdim*hdim) Time-space representation of stimulus view : :class:`MotionEnergyPyramid` Full description of the motion energy pyramid. nimages : int Number of video frames nfilters : int aspect_ratio : scalar, (hdim/vdim) stimulus_fps : int (fps) stimulus_vhsize : tuple of ints, (vdim, hdim) original_stimulus : 3D np.ndarray, (nimages, vdim, hdim) ''' def __init__(self, stimulus, stimulus_fps, temporal_frequencies=[0,2,4], spatial_frequencies=[0,2,4,8,16], spatial_directions=[0,45,90,135,180,225,270,315], sf_gauss_ratio=0.6, max_spatial_env=0.3, filter_spacing=3.5, tf_gauss_ratio=10., max_temp_env=0.3, include_edges=False, spatial_phase_offset=0.0, filter_temporal_width='auto'): ''' Notes ----- See :class:`MotionEnergyPyramid` for more detail on keyword arguments used to construct the motion energy pyramid. ''' nimages, vdim, hdim = stimulus.shape stimulus_vhsize = (vdim, hdim) original_stimulus = stimulus stimulus = stimulus.reshape(stimulus.shape[0], -1) aspect_ratio = hdim/vdim pyramid = MotionEnergyPyramid(stimulus_vhsize=(vdim, hdim), stimulus_fps=stimulus_fps, filter_temporal_width=filter_temporal_width, temporal_frequencies=temporal_frequencies, spatial_frequencies=spatial_frequencies, spatial_directions=spatial_directions, sf_gauss_ratio=sf_gauss_ratio, max_spatial_env=max_spatial_env, filter_spacing=filter_spacing, tf_gauss_ratio=tf_gauss_ratio, max_temp_env=max_temp_env, spatial_phase_offset=spatial_phase_offset, include_edges=include_edges) self.nimages = nimages self.stimulus = stimulus self.stimulus_fps = stimulus_fps self.stimulus_vhsize = stimulus_vhsize self.original_stimulus = original_stimulus self.aspect_ratio = aspect_ratio self.view = pyramid self.nfilters = pyramid.nfilters def __repr__(self): info = '<%s.%s [%i nframes @ %i fps. #%i filters. aspect=%0.03f]>' details = (__name__, type(self).__name__, self.nimages, self.stimulus_fps, self.nfilters, self.aspect_ratio) return info%details
[docs] def project_stimulus(self, *args, **kwargs): '''See :meth:`MotionEnergyPyramid.project_stimulus` ''' filter_responses = self.view.project_stimulus(*args, **kwargs) return filter_responses
[docs] def project(self, filters='all', quadrature_combination=utils.sqrt_sum_squares, output_nonlinearity=utils.log_compress, dtype='float32'): '''Compute the motion energy filter responses to the stimulus. Parameters ---------- filters : optional, 'all' or list of dicts By default compute the responses for all filters. Otherwise, provide a list of filter definitions to use. quadrature_combination : function, optional Specifies how to combine the channel reponse quadratures. The function must take the sin and cos as arguments in that order. Defaults to: :math:`(sin^2 + cos^2)^{1/2}` output_nonlinearity : function, optional Passes the channels (after ``quadrature_combination``) through a non-linearity. The function input is the (`nimages, nfilters`) array. Defaults to: :math:`log(x + 1e-05)` Returns ------- filter_responses : np.ndarray, (nimages, nfilters) ''' if filters == 'all': filters = self.view.filters stimulus = self.original_stimulus filter_responses = self.view.project_stimulus( stimulus, filters=filters, quadrature_combination=quadrature_combination, output_nonlinearity=output_nonlinearity, dtype=dtype) return filter_responses
[docs] def project_at_vhposition(self, centerv, centerh, quadrature_combination=utils.sqrt_sum_squares, output_nonlinearity=utils.log_compress, dtype='float32'): '''Center filters at vh-position and compute their response to the stimulus. Parameters ---------- centerv : scalar Vertical filter from top of frame (min=0, max=1.0). centerh : scalar Horizontal filter position from left of frame (min=0, max=aspect_ratio). Returns ------- centered_filters : list of dicts Spatio-temporal filter parameters at vh-position. filter_responses : np.ndarray, (nimages, len(vh_centered_filters)) ''' # center all spatial and temporal filters at desired vh-position filters = self.view.filters_at_vhposition(centerv, centerh) stimulus = self.original_stimulus filter_responses = self.view.project_stimulus( stimulus, filters=filters, quadrature_combination=quadrature_combination, output_nonlinearity=output_nonlinearity, dtype=dtype) return filters, filter_responses
[docs] def raw_projection(self, filters='all', dtype='float32'): '''Obtain responses to the stimulus from all filter quadrature-pairs. Parameters ---------- filters : optional, 'all' or list of dicts By default compute the responses for all filters. Otherwise, provide a list of filter definitions to use. Returns ------- output_sin : np.ndarray, (nimages, nfilters) output_cos : np.ndarray, (nimages, nfilters) ''' if filters == 'all': filters = self.view.filters stimulus = self.original_stimulus output_sin, output_cos = self.view.raw_project_stimulus( stimulus, filters=filters, dtype=dtype) return output_sin, output_cos
[docs]class StimulusStaticGaborPyramid(StimulusMotionEnergy): ''' ''' def __init__(self, stimulus, spatial_frequencies=[0,2,4,8,16], spatial_orientations=(0,45,90,135), sf_gauss_ratio=0.6, max_spatial_env=0.3, filter_spacing=3.5, include_edges=False, spatial_phase_offset=0.0, ): super(type(self), self).__init__(stimulus, spatial_frequencies=spatial_frequencies, spatial_directions=spatial_orientations, spatial_phase_offset=spatial_phase_offset, include_edges=include_edges, sf_gauss_ratio=sf_gauss_ratio, max_spatial_env=max_spatial_env, filter_spacing=filter_spacing, # fixed parameters for static filters: max_temp_env=np.inf, stimulus_fps=1, temporal_frequencies=[0], filter_temporal_width=1, tf_gauss_ratio=10)
[docs]class DefaultPyramids(object): ''' ''' def __init__(self): pass @property def pyramid15fps512x512(self): pyramid = MotionEnergyPyramid(stimulus_vhsize=(512, 512), stimulus_fps=15) return pyramid @property def pyramid15fps96x96(self): pyramid = MotionEnergyPyramid(stimulus_vhsize=(96, 96), stimulus_fps=15) return pyramid @property def pyramid24fps800x600(self): pyramid = MotionEnergyPyramid(stimulus_vhsize=(600, 800), stimulus_fps=24) return pyramid @property def pyramid24fps640x480(self): pyramid = MotionEnergyPyramid(stimulus_vhsize=(480, 640), stimulus_fps=24) return pyramid @property def pyramid24fps128x72(self): pyramid = MotionEnergyPyramid(stimulus_vhsize=(72, 128), stimulus_fps=24) return pyramid @property def pyramid24fps256x144(self): pyramid = MotionEnergyPyramid(stimulus_vhsize=(144, 256), stimulus_fps=24) return pyramid