diff --git a/neo/rawio/intanrawio.py b/neo/rawio/intanrawio.py index e7d668f14..5f2fef7a5 100644 --- a/neo/rawio/intanrawio.py +++ b/neo/rawio/intanrawio.py @@ -530,6 +530,57 @@ def _demultiplex_digital_data(self, raw_digital_data, channel_ids, i_start, i_st output[:, channel_index] = demultiplex_data[i_start:i_stop].flatten() return output + + def get_intan_timestamps(self, i_start=None, i_stop=None): + """ + Retrieves the sample indices from the Intan raw data within a specified range. + + Note that sample indices are called timestamps in the Intan format but they are + in fact just sample indices. This function extracts the sample index timestamps + from Intan files, which represent relative time points in sample units (not absolute time). + These indices can be particularly useful when working with recordings that have discontinuities. + + Parameters + ---------- + i_start : int, optional + The starting index from which to retrieve sample indices. If None, starts from 0. + i_stop : int, optional + The stopping index up to which to retrieve sample indices (exclusive). + If None, retrieves all available indices from i_start onward. + + Returns + ------- + timestamps : ndarray + The flattened array of sample indices within the specified range. + + Notes + ----- + - Sample indices can be converted to seconds by dividing by the sampling rate of the amplifier stream. + - The function automatically handles different file formats: + * header-attached: Timestamps are extracted directly from the timestamp field + * one-file-per-signal: Timestamps are read from the timestamp stream + * one-file-per-channel: Timestamps are read from the first channel in the timestamp stream + - When recordings have discontinuities (indicated by the `discontinuous_timestamps` + attribute being True), these indices allow for proper temporal alignment of the data. + """ + if i_start is None: + i_start = 0 + + # Get the timestamps based on file format + if self.file_format == "header-attached": + timestamps = self._raw_data["timestamp"] + elif self.file_format == "one-file-per-signal": + timestamps = self._raw_data["timestamp"] + elif self.file_format == "one-file-per-channel": + timestamps = self._raw_data["timestamp"][0] + + # TODO if possible ensure that timestamps memmaps are always of correct shape to avoid memory copy here. + timestamps = timestamps.flatten() if timestamps.ndim > 1 else timestamps + + if i_stop is None: + return timestamps[i_start:] + else: + return timestamps[i_start:i_stop] def _assert_timestamp_continuity(self): """ @@ -545,26 +596,11 @@ def _assert_timestamp_continuity(self): NeoReadWriteError If timestamps are not continuous and `ignore_integrity_checks` is False. The error message includes a table detailing the discontinuities found. - - Notes - ----- - The method extracts timestamps from the raw data based on the file format: - - * **header-attached:** Timestamps are extracted from a 'timestamp' field in the raw data. - * **one-file-per-signal:** Timestamps are taken from the last stream. - * **one-file-per-channel:** Timestamps are retrieved from the first channel of the last stream. """ # check timestamp continuity - if self.file_format == "header-attached": - timestamp = self._raw_data["timestamp"].flatten() - - # timestamps are always last stream for headerless binary files - elif self.file_format == "one-file-per-signal": - timestamp = self._raw_data["timestamp"] - elif self.file_format == "one-file-per-channel": - timestamp = self._raw_data["timestamp"][0] + timestamps = self.get_intan_timestamps() - discontinuous_timestamps = np.diff(timestamp) != 1 + discontinuous_timestamps = np.diff(timestamps) != 1 timestamps_are_not_contiguous = np.any(discontinuous_timestamps) if timestamps_are_not_contiguous: # Mark a flag that can be checked after parsing the header to see if the timestamps are continuous or not @@ -582,8 +618,8 @@ def _assert_timestamp_continuity(self): amplifier_sampling_rate = self._global_info["sampling_rate"] for discontinuity_index in np.where(discontinuous_timestamps)[0]: - prev_ts = timestamp[discontinuity_index] - next_ts = timestamp[discontinuity_index + 1] + prev_ts = timestamps[discontinuity_index] + next_ts = timestamps[discontinuity_index + 1] time_diff = (next_ts - prev_ts) / amplifier_sampling_rate error_msg += (