Rules

Upload rules for lab data management.

This module provides rules for processing data uploads to the database. Rules handle validation, preprocessing, and storage of different data types.

Default rules included:

  • UploadRule: Base rule for generic file uploads
  • EphysRule: Rule for electrophysiology data (SpikeGLX)
  • TwoPhotonRule: Rule for two-photon microscopy data (ScanImage/Scanbox)
  • OnePhotonRule: Rule for one-photon imaging data (Widefield - labcams)
  • MiniscopeRule: Rule for (UCLA) Miniscope imaging data
  • FixedBrainRule: Rule for fixed tissue microscopy data
  • ReplaceRule: Rule for replacing existing files

Custom rules can be added to the user_preferences.json configuration.

UploadRule

Source code in labdata/rules/utils.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
class UploadRule():
    def __init__(self,job_id,prefs = None):
        '''

Rule to apply on upload. 

        1) Checksum on the files; compare with provided (reserve job if use_db)
        2) Apply function
        3) Checksum on the output - the files that changed
        4) Submit upload
        5) Update tables 

Can submit job on slurm, some of these can be long or take resources.

        '''
        self.rule_name = 'default'

        self.job_id = job_id
        self.src_paths = None
        self.processed_paths = None
        self.dst_paths = None
        self.inserted_files = []
        if prefs is None:
            from ..utils import prefs
        self.prefs = prefs
        self.local_path = self.prefs['local_paths'][0]
        self.dataset_key = None # will get written on upload, use in _post_upload
        self.max_concurrent = -1  # maximum number of concurrent UploadJobs that can run [-1 is infinite].
        self.schema = None

    def apply(self):
        # parse inputs
        from ..schema import UploadJob, File, dj
        if not self.job_id is None:
            with dj.conn().transaction:
                self.jobquery = (UploadJob() & dict(job_id = self.job_id))
                job_status = self.jobquery.fetch(as_dict = True)
                if len(job_status):
                    if job_status[0]['job_waiting']:
                        # check here if there are other jobs running for the same rule on this host
                        if self.max_concurrent > 0:
                            kk = dict(job_host = self.prefs['hostname'],
                                      job_waiting = 0,
                                      job_status = 'WORKING',
                                      job_rule = None)
                            if not self.rule_name == 'default':
                                kk['job_rule'] = self.rule_name
                            number_of_running_jobs = len(UploadJob() & kk)
                            if number_of_running_jobs >= self.max_concurrent:
                                print(f"Job {self.job_id} can not run because there are {number_of_running_jobs} for the rule {self.rule_name} [limit:{self.max_concurrent}].")
                                self.set_job_status(job_status = 'WAITING', job_waiting = 1,
                                                    job_log = f'Waiting for {number_of_running_jobs} running {self.rule_name} to complete [limit:{self.max_concurrent}].')
                                return
                        self.set_job_status(job_status = 'WORKING', job_starttime = datetime.now(), job_waiting = 0) # take the job
                        self.schema = load_project_schema(job_status[0]['project_name'])
                    else:
                        print(f"Job {self.job_id} is already taken.")
                        return # exit.
                else:
                    raise ValueError(f'job_id {self.job_id} does not exist.')
        # get the paths
        self.src_paths = pd.DataFrame((self.schema.UploadJob.AssignedFiles() & dict(job_id = self.job_id)).fetch())
        if not len(self.src_paths):
            self.set_job_status(job_status = 'FAILED',
                                job_log = f'Could not find files for {self.job_id} in Upload.AssignedFiles.')
            raise ValueError(f'Could not find files for {self.job_id} in Upload.AssignedFiles.')
        self.upload_storage = self.jobquery.fetch('upload_storage')[0]
        if self.upload_storage is None:
            if 'upload_storage' in self.prefs.keys():
                self.upload_storage = self.prefs['upload_storage']

        # this should not fail because we have to keep track of errors, should update the table
        src = [Path(self.local_path) / p for p in self.src_paths.src_path.values]
        try:
            comparison = compare_md5s(src,self.src_paths.src_md5.values)
        except Exception as err:
            print('File not found for {0}?'.format(Path(self.src_paths.src_path.iloc[0]).parent))
            self.set_job_status(job_status = 'FAILED', job_log = f'FILE NOT FOUND; check file transfer {err}.')
            return

        if not comparison:
            print('CHECKSUM FAILED for {0}'.format(Path(self.src_paths.src_path.iloc[0]).parent))
            self.set_job_status(job_status = 'FAILED',job_log = 'MD5 CHECKSUM failed; check file transfer.')
            return # exit.
        import traceback
        try:
            paths = self._apply_rule() # can use the src_paths
            self._upload()             # compare the hashes after
        except Exception as err:
            # log the error
            print('There was an error processing or uploading this dataset.')
            print(err)
            self.set_job_status(job_status = 'FAILED',job_log = f'ERROR uploading {traceback.format_exc()}')
            return
        try:
            self._post_upload()        # so the rules can insert tables and all.
        except Exception as err:
            # log the error
            print('There was an error with the post-upload this dataset.')
            print(err)
            self.set_job_status(job_status = 'FAILED',job_log = f'POST-UPLOAD: {traceback.format_exc()}')
            return 
        return paths # so apply can output paths.

    def set_job_status(self, job_status = 'FAILED',job_waiting = 0, **kwargs):
        from ..schema import UploadJob
        if not self.job_id is None:
            kk = dict(job_id = self.job_id,
                      job_waiting = job_waiting,
                      job_status = job_status,
                      **kwargs)
            if 'job_log' in kk.keys():
                if len(kk['job_log']) > 500:
                    kk['job_log'] = kk['job_log'][:500-1]
            if job_status == 'FAILED':
                print(f'Check job_id {self.job_id} : {job_status}')
                kk['job_host'] = self.prefs['hostname'], # write the hostname so we know where it failed.
            UploadJob.update1(kk)

    def _handle_processed_and_src_paths(self, processed_files,new_files):
        '''
        Put the files in the proper place and compute checksums for new files.
        Call this from the apply method.
        '''
        n_jobs = DEFAULT_N_JOBS
        self.processed_paths = []
        for f in processed_files:
            i = np.where(self.src_paths.src_path == f)[0][0]
            self.processed_paths.append(self.src_paths.iloc[i])
            self.src_paths.drop(self.src_paths.iloc[i].name,axis = 0,inplace = True)
            self.src_paths.reset_index(drop=True,inplace = True)
        self.processed_paths = pd.DataFrame(self.processed_paths).reset_index(drop=True)        

        res = Parallel(n_jobs = n_jobs)(delayed(_checksum_files)(
            path,
            local_path = self.local_path) for path in new_files)
        for r in res:
            r['job_id'] = self.job_id
        self.src_paths = pd.concat([self.src_paths,pd.DataFrame(res)], ignore_index=True)
        # drop duplicate paths in case there are any
        self.src_paths = self.src_paths.drop_duplicates(subset=['src_path'], keep='last')

    def _post_upload(self):
        return

    def _upload(self):
        # this reads the attributes and uploads
        # It also puts the files in the Tables

        # destination in the bucket is actually the path
        dst = [k for k in self.src_paths.src_path.values]
        # source is the place where data are
        src = [Path(self.local_path) / p for p in self.src_paths.src_path.values] # same as md5
        # s3 copy in parallel hashes were compared before so no need to do it now.
        self.set_job_status(job_status = 'WORKING',
                            job_log = datetime.now().strftime(f'Uploading {len(src)} files %Y %m %d %H:%M:%S'),
                            job_waiting = 0)
        copy_to_s3(src, dst, md5_checksum=None,storage_name=self.upload_storage)
        self.set_job_status(job_status = 'WORKING',
                            job_log = datetime.now().strftime('%Y %m %d %H:%M:%S'),
                            job_endtime = datetime.now(),
                            job_waiting = 0)
        import traceback
        with self.schema.dj.conn().transaction:  # make it all update at the same time
            # insert to Files so we know where to get the data
            insfiles = []
            for i,f in self.src_paths.iterrows():
                insfiles.append(dict(file_path = f.src_path,
                                     storage = self.upload_storage,
                                     file_datetime = f.src_datetime,
                                     file_size = f.src_size,
                                     file_md5 = f.src_md5))
            if len(insfiles):
                try:
                    self.schema.File.insert(insfiles)
                except Exception as err:
                    print(f'There was an error inserting the files to the File table. {insfiles}')
                    print(err)
                    self.set_job_status(job_status = 'FAILED',
                                        job_log = f'DUPLICATES? {[k["file_path"] for k in insfiles]}')
                    return
            # Add to dataset?
            job = self.jobquery.fetch(as_dict=True)[0]
            # check if it has a dataset
            if all([not job[a] is None for a in ['subject_name','session_name','dataset_name']]):
                for i,p in enumerate(insfiles):
                    insfiles[i] = dict(subject_name = job['subject_name'],
                                       session_name = job['session_name'],
                                       dataset_name = job['dataset_name'],
                                       file_path = p['file_path'],
                                       storage = self.upload_storage)
                if len(insfiles):
                    seskey = dict(subject_name = job['subject_name'],
                                  session_name = job['session_name'])
                    if not len(self.schema.Session & seskey): # check if the session needs to be added
                        # TODO: This needs to parse the session datetime from the path otherwise it won't be able to insert.
                        self.schema.Session.insert1(seskey,skip_duplicates = True)
                    dsetkey=dict(subject_name = job['subject_name'],
                                 session_name = job['session_name'], 
                                 dataset_name = job['dataset_name'])
                    if not len(self.schema.Dataset & dsetkey):# check if the dataset needs to be added
                        self.schema.Dataset.insert1(dsetkey,
                                                    skip_duplicates = True)
                    self.schema.Dataset.DataFiles.insert(insfiles)
                self.dataset_key = dict(subject_name = job['subject_name'],
                                        session_name = job['session_name'],
                                        dataset_name = job['dataset_name'])
            self.inserted_files += insfiles
            # Insert the processed files so the deletions are safe
            if not self.processed_paths is None:
                ins = []
                for i,f in self.processed_paths.iterrows():
                    ins.append(dict(file_path = f.src_path,
                                    file_datetime = f.src_datetime,
                                    file_size = f.src_size,
                                    file_md5 = f.src_md5))
                if len(ins):
                    self.schema.ProcessedFile.insert(ins)
            if len(self.inserted_files): # keep the job in the queue 
                # completed
                self.set_job_status(job_status = 'COMPLETED',
                                    job_log = f'UPLOADED {len(self.inserted_files)}',
                                    job_endtime = datetime.now(),
                                    job_waiting = 0)
            else:
                self.set_job_status(job_status = 'FAILED',
                                    job_log = 'No files inserted',
                                    job_endtime = datetime.now(),
                                    job_waiting = 0)

    def _apply_rule(self):
        # this rule does nothing, so the src_paths are going to be empty,
        # and the "paths" are going to be the src_paths
        self.processed_paths = None # processed paths are just the same, no file changed, so no need to do anything.
        # needs to compute the checksum on all the new files
        return self.src_paths.src_path.values

__init__(job_id, prefs=None)

Rule to apply on upload.

    1) Checksum on the files; compare with provided (reserve job if use_db)
    2) Apply function
    3) Checksum on the output - the files that changed
    4) Submit upload
    5) Update tables

Can submit job on slurm, some of these can be long or take resources.

Source code in labdata/rules/utils.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    def __init__(self,job_id,prefs = None):
        '''

Rule to apply on upload. 

        1) Checksum on the files; compare with provided (reserve job if use_db)
        2) Apply function
        3) Checksum on the output - the files that changed
        4) Submit upload
        5) Update tables 

Can submit job on slurm, some of these can be long or take resources.

        '''
        self.rule_name = 'default'

        self.job_id = job_id
        self.src_paths = None
        self.processed_paths = None
        self.dst_paths = None
        self.inserted_files = []
        if prefs is None:
            from ..utils import prefs
        self.prefs = prefs
        self.local_path = self.prefs['local_paths'][0]
        self.dataset_key = None # will get written on upload, use in _post_upload
        self.max_concurrent = -1  # maximum number of concurrent UploadJobs that can run [-1 is infinite].
        self.schema = None

FixedBrainRule

Bases: UploadRule

Source code in labdata/rules/imaging.py
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class FixedBrainRule(UploadRule):
    def __init__(self, job_id, prefs = None):
        super(FixedBrainRule,self).__init__(job_id = job_id, prefs = prefs)
        self.rule_name = 'fixed_brain'
        self.metadata = dict(num_channels = None,
                             um_per_pixel = None,
                             hardware = 'unknown',
                             width = None,
                             height = None)
        self.max_concurrent = 2

    def parse_metadata(self,data):
        '''
        Reads metadata from a MultifolderTiffStack
        '''
        from tifffile import TiffFile
        tf = TiffFile(data.filenames[0][0])
        metadata = tf.ome_metadata
        um_per_pixel = []
        if not metadata is None: # to work with non-OME files 
            for t in metadata.split('\n'):
                if 'Pixels' in t:
                    for p in t.strip('>').split(' '):
                        if 'PhysicalSize' in p:
                            um_per_pixel.append(float(p.split('=')[1].split('"')[1]))
            self.metadata['um_per_pixel'] = um_per_pixel[::-1]

            if 'UltraII' in metadata:
                self.metadata['hardware'] = 'lightsheet UltraII'

        self.metadata['num_channels'] = len(data.filenames)
        self.metadata['height'] = data.shape[-2]
        self.metadata['width'] = data.shape[-1]

    def _apply_rule(self):
        extensions = ['.ome.tif','.tif','.TIFF','.TIF']
        for extension in extensions:
            files_to_compress = list(filter(lambda x: (extension in x),
                                            self.src_paths.src_path.values))
            if len(files_to_compress):
                break # found files
        local_path = self.prefs['local_paths'][0]
        local_path = Path(local_path)

        new_files = []
        full_file_paths = [local_path/f for f in files_to_compress]
        channel_folders = np.sort(np.unique([p.parent for p in full_file_paths]))
        print(f'Found folders: {channel_folders}')
        data = MultifolderTiffStack(channel_folders = channel_folders,
                                    extensions = extensions)
        compressed_file = channel_folders[0].parent.with_suffix('.zarr.zip')
        # read the first file and get metadata
        self.parse_metadata(data)
        # compression to zarr.zip
        self.set_job_status(job_status = 'WORKING',
                            job_log = datetime.now().strftime('Compressing data %Y %m %d %H:%M:%S'),
                            job_waiting = 0)
        z1 = compress_imaging_stack(data,
                                    compressed_file,
                                    chunksize = 4,
                                    compression = 'zstd',
                                    clevel = 6,
                                    shuffle = 1,
                                    filters = [])
        new_files.append(str(compressed_file).replace(str(local_path),'').strip(os.sep))
        if len(new_files):
            self._handle_processed_and_src_paths(files_to_compress, new_files)
        return self.src_paths
    # need to implement post-upload for lightsheet.
    def _post_upload(self):
        if not self.dataset_key is None:
            if len(self.inserted_files) > 0:
                storage = self.inserted_files[0]['storage']
                filenames = [f['file_path'] for f in self.inserted_files if f['file_path'].endswith('.zarr.zip')]
                if not len(filenames):
                    print('Could not find zarr compressed stack.')
                self.schema.FixedBrain().insert1(dict(self.dataset_key,
                                         file_path = filenames[0],
                                         storage = storage,
                                         **self.metadata),allow_direct_insert=True)

parse_metadata(data)

Reads metadata from a MultifolderTiffStack

Source code in labdata/rules/imaging.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def parse_metadata(self,data):
    '''
    Reads metadata from a MultifolderTiffStack
    '''
    from tifffile import TiffFile
    tf = TiffFile(data.filenames[0][0])
    metadata = tf.ome_metadata
    um_per_pixel = []
    if not metadata is None: # to work with non-OME files 
        for t in metadata.split('\n'):
            if 'Pixels' in t:
                for p in t.strip('>').split(' '):
                    if 'PhysicalSize' in p:
                        um_per_pixel.append(float(p.split('=')[1].split('"')[1]))
        self.metadata['um_per_pixel'] = um_per_pixel[::-1]

        if 'UltraII' in metadata:
            self.metadata['hardware'] = 'lightsheet UltraII'

    self.metadata['num_channels'] = len(data.filenames)
    self.metadata['height'] = data.shape[-2]
    self.metadata['width'] = data.shape[-1]

EphysRule

Bases: UploadRule

Source code in labdata/rules/ephys.py
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class EphysRule(UploadRule):
    def __init__(self, job_id,prefs = None):
        super(EphysRule,self).__init__(job_id = job_id,prefs = prefs)
        self.rule_name = 'ephys'
        self.max_concurrent = 2

    def _apply_rule(self):
        # compress ap files or lf files.
        files_to_compress = list(filter(lambda x: ('.ap.bin' in x) or
                                        ('.lf.bin' in x),
                                        self.src_paths.src_path.values))
        n_jobs = DEFAULT_N_JOBS
        # compress these in parallel, will work for multiprobe sessions faster?
        if len(files_to_compress): # in some cases data might have already been compressed
            self.set_job_status(job_status = 'WORKING',
                                job_log = datetime.now().strftime(f'Compressing {len(files_to_compress)} files [%Y %m %d %H:%M:%S]'))
            res = Parallel(n_jobs = n_jobs)(delayed(compress_ephys_file)(
                filename,
                local_path = self.local_path,
                n_jobs = n_jobs,
                prefs = self.prefs) for filename in files_to_compress)
            new_files = np.stack(res).flatten() # stack the resulting files and add them to the path
            self.set_job_status(job_status = 'WORKING',
                                job_log = datetime.now().strftime(f'Compressed {len(files_to_compress)} files [%Y %m %d %H:%M:%S]'))
            self._handle_processed_and_src_paths(files_to_compress, new_files)
        return self.src_paths

    def _post_upload(self):
        if not self.dataset_key is None:
            self.schema.EphysRecording().add_spikeglx_recording(self.dataset_key)
            self.schema.EphysRecording().add_nidq_events(self.dataset_key)

TwoPhotonRule

Bases: UploadRule

Source code in labdata/rules/imaging.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class TwoPhotonRule(UploadRule):
    def __init__(self, job_id,prefs = None):
        super(TwoPhotonRule,self).__init__(job_id = job_id, prefs = prefs)
        self.rule_name = 'two_photon'
        self.recording_metadata = None
        self.planes_metadata = []
        self.max_concurrent = 2    

    def _apply_rule(self):
        # compress ap files or lf files.
        files_to_compress = list(filter(lambda x: ('.sbx' in x),
                                        self.src_paths.src_path.values))
        if not len(files_to_compress):
            print('There are no sbx files.')
            files_to_compress = list(filter(lambda x: ('.tif' in x),
                                        self.src_paths.src_path.values))
            if len(files_to_compress):
                raise(ValueError('Tiff processing not implemented for this rule.'))

        local_path = self.prefs['local_paths'][0]
        local_path = Path(local_path)
        # TODO: make this work for tiff files also
        if len(files_to_compress):
            if len(files_to_compress)>1:
                raise(ValueError(f'Can only handle one file {files_to_compress}'))
            new_files = []
            for f in files_to_compress:
                compressed_file = self.process_sbx(local_path/f)
                # pass the file path without the "local_path"!
                new_files.append(str(compressed_file).replace(str(local_path),'').strip(os.sep))
            if len(new_files):
                self._handle_processed_and_src_paths(files_to_compress, new_files)
        return self.src_paths

    def process_sbx(self,sbxfile):
        try:
            from sbxreader import sbx_memmap
        except:
            msg = f'Could not import SBXREADER: "pip install sbxreader" on {pref["hostname"]}'
            print(msg)
            self.set_job_status(job_status = 'FAILED',job_log = msg,job_waiting = 0)
        sbx = sbx_memmap(sbxfile)
        compressed_file = sbxfile.with_suffix('.zarr.zip')
        z1 = compress_imaging_stack(sbx,
                                    compressed_file,
                                    chunksize = 128,
                                    compression = 'zstd',#'blosc2',
                                    clevel = 6,
                                    shuffle = 1)
        nframes,nplanes,nchannels,H,W = z1.shape

        pmt_gain = [sbx.metadata['pmt0_gain']]
        if nchannels>1:
            pmt_gain+= [sbx.metadata['pmt1_gain']],

        self.recording_metadata = dict(n_planes = nplanes,
                                       n_channels = nchannels,
                                       n_frames = nframes,
                                       width = W,
                                       height = H,
                                       magnification = sbx.metadata['magnification'],
                                       objective_angle = sbx.metadata['stage_angle'],
                                       objective = sbx.metadata['objective'],
                                       um_per_pixel = np.array([sbx.metadata['um_per_pixel_x'],
                                                                sbx.metadata['um_per_pixel_y']]),
                                       frame_rate = sbx.metadata['frame_rate']/nchannels,
                                       scanning_mode = sbx.metadata['scanning_mode'],
                                       pmt_gain = np.array(pmt_gain),
                                       imaging_software = f"scanbox_{sbx.metadata['scanbox_version']}")
        for iplane in range(nplanes):
            depth = sbx.metadata['stage_pos'][-1]
            if nplanes > 1:
                depth += sbx.metadata['etl_pos'][iplane] - sbx.metadata['etl_pos'][0]

            self.planes_metadata.append(dict(plane_num = iplane,
                                             plane_depth = depth))
        return compressed_file

    def _post_upload(self):
        if not self.dataset_key is None:
            if len(self.inserted_files) > 0:
                storage = self.inserted_files[0]['storage']
                filenames = [f['file_path'] for f in self.inserted_files if f['file_path'].endswith('.zarr.zip')]
                if not len(filenames):
                    print('Could not find zarr compressed stack.')
                self.schema.TwoPhoton().insert1(dict(self.dataset_key,
                                         file_path = filenames[0],
                                         storage = storage,
                                         **self.recording_metadata),allow_direct_insert=True)
                self.schema.TwoPhoton.Plane().insert([dict(self.dataset_key,**d) for d in self.planes_metadata],allow_direct_insert=True)

OnePhotonRule

Bases: UploadRule

Source code in labdata/rules/imaging.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class OnePhotonRule(UploadRule):
    def __init__(self, job_id, prefs = None):
        super(OnePhotonRule,self).__init__(job_id = job_id, prefs = prefs)
        self.rule_name = 'one_photon'
        self.imaging_metadata = None
        self.max_concurrent = 2    

    def _apply_rule(self):
        files_to_compress = list(filter(lambda x: ('.dat' in x),
                                        self.src_paths.src_path.values))
        # TODO: make this work for tiff files also
        local_path = self.prefs['local_paths'][0]
        local_path = Path(local_path)
        if len(files_to_compress):
            if len(files_to_compress)>1:
                raise(ValueError(f'Can only handle one file {files_to_compress}'))
            new_files = []
            for f in files_to_compress:
                filename = local_path/f
                data = mmap_wfield_binary(filename)
                compressed_file = filename.with_suffix('.zarr.zip')
                # compression to zarr.zip
                self.set_job_status(job_status = 'WORKING',
                                    job_log = datetime.now().strftime('Compressing data %Y %m %d %H:%M:%S'),
                                    job_waiting = 0)
                z1 = compress_imaging_stack(data,
                                            compressed_file,
                                            chunksize = 16,
                                            compression = 'zstd',
                                            clevel = 6,
                                            shuffle = 1,
                                            filters = [])
                # pass the file path without the "local_path"!
                new_files.append(str(compressed_file).replace(str(local_path),'').strip(os.sep))
            if len(new_files):
                self._handle_processed_and_src_paths(files_to_compress, new_files)
        return self.src_paths
    # need to implement post-upload for widefield imaging.
    def _post_upload(self):
        if not self.dataset_key is None:
            insert_widefield_dataset(self.schema,self.dataset_key,local_paths=[self.local_path])

MiniscopeRule

Bases: UploadRule

Source code in labdata/rules/imaging.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
class MiniscopeRule(UploadRule):
    def __init__(self, job_id, prefs = None):
        super(MiniscopeRule,self).__init__(job_id = job_id, prefs = prefs)
        self.rule_name = 'miniscope'
        self.imaging_metadata = None
        self.max_concurrent = 4    

    def _apply_rule(self):
        from natsort import natsorted
        files_to_compress = natsorted([str(s) for s in list(filter(lambda x: ('.avi' in x),
                                                         self.src_paths.src_path.values))])
        local_path = self.prefs['local_paths'][0]
        local_path = Path(local_path)

        new_files = []
        files_to_process = [local_path/f for f in files_to_compress]
        # open the video stack
        from ..stacks import VideoStack
        data = VideoStack(files_to_process)  # this does not support multiple channels at the moment.
        compressed_file = (Path(files_to_process[0]).parent/'miniscope_stack').with_suffix('.zarr.zip')
        self.set_job_status(job_status = 'WORKING',
                            job_log = datetime.now().strftime('Compressing data %Y %m %d %H:%M:%S'),
                            job_waiting = 0)
        z1 = compress_imaging_stack(data,
                                    compressed_file,
                                    chunksize = 512,
                                    compression = 'zstd',
                                    clevel = 6,
                                    shuffle = 1,
                                    filters = [])
        new_files.append(str(compressed_file).replace(str(local_path),'').strip(os.sep))
        if len(new_files):
            self._handle_processed_and_src_paths(files_to_compress, new_files)
        return self.src_paths

    def _post_upload(self):
        if not self.dataset_key is None:
            insert_miniscope_dataset(self.schema,self.dataset_key, local_paths = [self.local_path])

process_upload_jobs(key=None, rule='all', n_jobs=8, job_host=None, force=False, prefs=None)

Process UploadJobs using UploadRule(s). Custom upload rules can be added as plugins, just include in prefs['upload_rules']

Joao Couto - labdata 2024

Source code in labdata/rules/__init__.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def process_upload_jobs(key = None,
                        rule = 'all',
                        n_jobs = 8,
                        job_host = None,
                        force = False,
                        prefs = None):
    '''
    Process UploadJobs using UploadRule(s).
    Custom upload rules can be added as plugins, just include in prefs['upload_rules']

    Joao Couto - labdata 2024
    '''
    from tqdm import tqdm
    from ..schema import UploadJob
    def _job(j,rule = None, prefs = None):
        jb = (UploadJob() & f'job_id = {j}').fetch1()
        if not jb is None:
            if jb['job_rule'] in rulesmap.keys(): 
                rl = rulesmap[jb['job_rule']](j, prefs = prefs)
            else:
                rl = UploadRule(j,prefs = prefs)
            res = rl.apply()
        return res
    if key is None:
        jobs = (UploadJob() & 'job_waiting = 1').fetch('job_id')
        if not job_host is None:
            if job_host == 'self':
                job_host = prefs['hostname']
            jobs = (UploadJob() &
                    'job_waiting = 1' &
                    f'job_host = "{job_host}"').fetch('job_id')
    else:
        jobs = (UploadJob() & key).fetch('job_id')
    if force:
        for j in jobs:
            UploadJob().update1(dict(job_id  = j, job_waiting = 1)) # reset job
    if len(jobs) == 1:
        res = [_job(jobs[0], rule = rule, prefs = prefs)]
    else:   
        res = Parallel(backend='loky',n_jobs = n_jobs)(delayed(_job)(u,
                                                                     rule = rule,
                                                                     prefs = prefs) 
                       for u in tqdm(jobs,desc = "Running upload jobs: "))
    return res

Auxiliary Functions

Upload rules for lab data management.

This module provides rules for processing data uploads to the database. Rules handle validation, preprocessing, and storage of different data types.

Default rules included:

  • UploadRule: Base rule for generic file uploads
  • EphysRule: Rule for electrophysiology data (SpikeGLX)
  • TwoPhotonRule: Rule for two-photon microscopy data (ScanImage/Scanbox)
  • OnePhotonRule: Rule for one-photon imaging data (Widefield - labcams)
  • MiniscopeRule: Rule for (UCLA) Miniscope imaging data
  • FixedBrainRule: Rule for fixed tissue microscopy data
  • ReplaceRule: Rule for replacing existing files

Custom rules can be added to the user_preferences.json configuration.

MultifolderTiffStack

Bases: object

Source code in labdata/rules/imaging.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
class MultifolderTiffStack(object):
    def __init__(self,channel_folders, extensions = ['.ome.tif','.tif','.TIFF']):
        '''
        Simple class to access tiff files that are organized in a folders
        Each folder is a channel and contains multiple TIFF files.

        This is the format of the lightsheet microscope for example. 
        It is a place-holder class that should be modified to work for scanimage files also.
        '''
        self.nchannels = len(channel_folders)
        self.filenames = []
        for folder in channel_folders:
            for extension in extensions:
                tiffs = list(Path(folder).rglob('*' + extension))
                if len(tiffs) > 0:
                    self.filenames.append(natsorted(tiffs))
                    break # found extension
        # read the first file
        im = read_ome_tif(self.filenames[0][0])
        if len(im.shape) == 2:
            # then the number frames is the number of tiff files
            self.nframes = len(self.filenames[0])
            self.frame_dims = im.shape
            self.dtype = im.dtype
        else:
            raise NotImplementedError('This needs to be changed to work with multiple-frame TIFF files.')
        self.shape = (self.nframes,self.nchannels,*self.frame_dims)
    def __len__(self):
        return self.nframes

    def __getitem__(self,*args):
        index = args[0]
        if type(index) is slice:
            idx1 = range(*index.indices(self.nframes))
        elif type(index) in [int,np.int32, np.int64]: # just one frame
            idx1 = [index]
        else: # np.array?
            idx1 = index
        img = []
        for i in idx1:
            img.append(self.get(i))
        return np.stack(img).squeeze().reshape(len(img),self.nchannels, *self.frame_dims)

    def get(self,idx):
        return np.stack([read_ome_tif(files[idx]) for files in self.filenames])

__init__(channel_folders, extensions=['.ome.tif', '.tif', '.TIFF'])

Simple class to access tiff files that are organized in a folders Each folder is a channel and contains multiple TIFF files.

This is the format of the lightsheet microscope for example. It is a place-holder class that should be modified to work for scanimage files also.

Source code in labdata/rules/imaging.py
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
def __init__(self,channel_folders, extensions = ['.ome.tif','.tif','.TIFF']):
    '''
    Simple class to access tiff files that are organized in a folders
    Each folder is a channel and contains multiple TIFF files.

    This is the format of the lightsheet microscope for example. 
    It is a place-holder class that should be modified to work for scanimage files also.
    '''
    self.nchannels = len(channel_folders)
    self.filenames = []
    for folder in channel_folders:
        for extension in extensions:
            tiffs = list(Path(folder).rglob('*' + extension))
            if len(tiffs) > 0:
                self.filenames.append(natsorted(tiffs))
                break # found extension
    # read the first file
    im = read_ome_tif(self.filenames[0][0])
    if len(im.shape) == 2:
        # then the number frames is the number of tiff files
        self.nframes = len(self.filenames[0])
        self.frame_dims = im.shape
        self.dtype = im.dtype
    else:
        raise NotImplementedError('This needs to be changed to work with multiple-frame TIFF files.')
    self.shape = (self.nframes,self.nchannels,*self.frame_dims)

compress_imaging_stack(stack, filename, chunksize=256, compression='blosc2', clevel=6, shuffle=1, filters=[], zarr_format=2, scratch_path=None, check_dataset=True)

stack is in shape [nframes,nchan,H,W]

Typical use case for two photon datasets: - blosc2 compression clevel 6, shuffle 1, no filters this will take ~10min/25Gb and the result is a file 77% of the original file. Typical use case for one photon datasets: - blosc2 compression clevel 6, shuffle 1, no filters Typical use for lightsheet imaging: - blosc2 compression clevel 6, shuffle 1, no filters

TODO: implement a way of changing the chunksize of the inner dimensions.. TODO: implement a way to skip the zip

Source code in labdata/rules/imaging.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def compress_imaging_stack(stack, filename, 
                           chunksize = 256,
                           compression = 'blosc2',
                           clevel = 6,
                           shuffle = 1,
                           filters = [],
                           zarr_format = 2,
                           scratch_path = None,
                           check_dataset = True):
    '''
    stack is in shape [nframes,nchan,H,W]

    Typical use case for two photon datasets:
          - blosc2 compression clevel 6, shuffle 1, no filters
         this will take ~10min/25Gb and the result is a file 77% of the original file.
    Typical use case for one photon datasets:
          - blosc2 compression clevel 6, shuffle 1, no filters
    Typical use for lightsheet imaging:
          - blosc2 compression clevel 6, shuffle 1, no filters

    TODO: implement a way of changing the chunksize of the inner dimensions..
    TODO: implement a way to skip the zip

    '''

    import zarr
    import string
    from tqdm import tqdm
    from zipfile import ZipFile
    from pathlib import Path
    import numcodecs
    filt = []
    if 'delta' in filters:
        from numcodecs import Delta
        #raise NotImplementedError('Filters are not implemented because of incompatibility issues with zarr v3.')
        filt += [Delta(dtype=stack.dtype)]
    if compression in ['zstd','lz4hc','lz4']:
        if zarr_format == 3:
            compressor = zarr.codecs.BloscCodec(cname = compression, clevel = clevel, shuffle = 'shuffle')
        else: # zarr v2 is smaller and faster at least Feb 2025..
            from numcodecs import Blosc
            zarr_format = 2
            compressor = Blosc(cname = compression, clevel = clevel, shuffle = shuffle)
    elif compression == 'blosc2':
        from imagecodecs.numcodecs import Blosc2
        numcodecs.register_codec(Blosc2)
        compressor = Blosc2(level=clevel, shuffle = shuffle)
        zarr_format = 2
    else:
        raise NotImplementedError(f'Compression {compression} not implemented')

    if scratch_path is None:
        if 'scratch_path' in prefs:
            scratch_path = Path(prefs['scratch_path'])
        if scratch_path is None:
            scratch_path = Path('.')

    rand = ''.join(np.random.choice([s for s in string.ascii_lowercase + string.digits],9))
    tmppath = Path(scratch_path/f'temporary_zarr_{rand}.zarr')
    if len(stack.shape) == 4:
        chunks = (chunksize, 1,*stack.shape[-2:])
    elif len(stack.shape) == 3:
        chunks = (chunksize, *stack.shape[-2:])
    elif len(stack.shape) == 5:
        chunks = (chunksize,1, 1,*stack.shape[-2:])        
    else:
        raise(ValueError(f'Only 3d, 4d or 5d stacks are supported. Stack: {stack.shape}.'))
    if zarr_format == 2:
        z1 = zarr.open(tmppath, mode='w', shape = stack.shape,
        chunks = chunks, dtype = stack.dtype, 
        compressor = compressor, filters = filt, zarr_format = 2)
    elif zarr_format == 3:
        z1 = zarr.create_array(store=tmppath, 
        shape=stack.shape, dtype=stack.dtype, chunks=chunks, compressors=compressor)
    else:
        raise NotImplementedError(f'The array format {zarr_format} is not implemented.')
    for s in tqdm(chunk_indices(len(stack),chunksize),
                  desc = 'Compressing to zarr:'):
        z1[s[0]:s[1]] = np.array(stack[s[0]:s[1]])
    with ZipFile(filename,'w') as z:
        tmp = list(tmppath.rglob('*'))
        tmp = list(filter(lambda x: x.is_file(),tmp)) # for compat with zarr3
        [z.write(t,arcname=t.relative_to(tmppath)) for t in tqdm(
            tmp,
            desc='Saving to zip:')]
    # delete the temporary
    from shutil import rmtree
    rmtree(tmppath)
    # open the new array
    z1 = open_zarr(filename,mode = 'r')
    if check_dataset:
        # check the new array
        for s in tqdm(chunk_indices(len(stack),chunksize),desc = 'Checking data:'):
            if not np.all(z1[s[0]:s[1]] == np.array(stack[s[0]:s[1]])):
                raise(ValueError(f"Datasets are not equivalent, compression failed {filename}. "))
    return z1

compress_ephys_file(filename, local_path=None, ext='.bin', n_jobs=DEFAULT_N_JOBS, check_after_compress=True, prefs=None)

Compress ephys data

Source code in labdata/rules/ephys.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def compress_ephys_file(filename, local_path = None, 
                        ext = '.bin',
                        n_jobs = DEFAULT_N_JOBS,
                        check_after_compress = True,
                        prefs = None):
    '''
    Compress ephys data
    '''
    if prefs is None:
        from ..utils import prefs
    if local_path is None:
        local_path = prefs['local_paths'][0]
    local_path = Path(local_path)

    from spks.spikeglx_utils import read_spikeglx_meta

    binfile = local_path/filename
    if not binfile.exists():
        raise OSError(f'Could not find binfile to compress ephys {binfile}')

    metafile = local_path/str(filename).replace(ext,'.meta')    
    if not metafile.exists():
        raise OSError(f'Could not find metafile to compress ephys {metafile}')

    meta = read_spikeglx_meta(metafile)  # to get the sampling rate and nchannels
    srate = meta['sRateHz']
    nchannels = meta['nSavedChans']
    from mtscomp import compress, decompress
    # Compress a .bin file into a pair .cbin (compressed binary file) and .ch (JSON file).
    cbin,ch = (str(binfile).replace(ext,'.cbin'),str(binfile).replace(ext,'.ch'))
    compress(binfile, cbin, ch,
             sample_rate = srate, n_channels = int(nchannels),
             check_after_compress = check_after_compress,
             chunk_duration = 1, dtype=np.int16, n_threads = n_jobs)
    return cbin.replace(str(local_path),'').strip(os.sep),ch.replace(str(local_path),'').strip(os.sep)

extract_events_from_nidq(paths)

Extracts the events from the nidq files (spikeglx) and formats it so they can be inserted in the database.

Source code in labdata/rules/ephys.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def extract_events_from_nidq(paths):
    '''
    Extracts the events from the nidq files (spikeglx) and formats it so they can be inserted in the database.
    '''
    from spks.spikeglx_utils import load_spikeglx_binary,read_spikeglx_meta
    from spks.sync import unpackbits_gpu

    nidqfilepath = list(filter(lambda x: '.bin' in x.suffix ,paths))
    if not len(nidqfilepath):
        # didnt find the files, try compressed
        nidqfilepath = list(filter(lambda x: '.cbin' in x.suffix ,paths))
        if not len(nidqfilepath):
            raise(OSError(f'Could not find the nidaq file path for {paths}.'))
    nidqfilepath = nidqfilepath[0]
    # load the nidq file
    if str(nidqfilepath).endswith('.bin'):
        dat,meta = load_spikeglx_binary(nidqfilepath)
    else:
        from mtscomp import decompress
        dat = decompress(nidqfilepath)
        meta = read_spikeglx_meta(nidqfilepath.with_suffix('.meta'))
    # read the sync channel
    onsets,offsets = unpackbits_gpu(dat[:,-1])
    stream_name = 'nidq'
    if '.obx.' in str(nidqfilepath):
        # onebox has a different file structure, the ai goes in the second last channel
        stream_name = 'obx'
        s,e = unpackbits_gpu(dat[:,-2])
        for k in s.keys():
            onsets[f'io{k}'] = s[k]
            offsets[f'io{k}'] = e[k]
    events = []
    for k in onsets.keys():
        o = onsets[k]
        f = offsets[k]
        ts = np.hstack([o,f])
        v = np.hstack([np.array(o)*0+1,np.array(f)*0]).astype('uint8')
        ii = np.argsort(ts)
        events.append(dict(event_name = k,
                           event_timestamps = ts[ii]/meta['sRateHz'], # in seconds
                           event_values =  v[ii])) # store the onsets and offsets
    return events, dat

ephys_noise_statistics_from_file(filepath, channel_indices, gain, sampling_rate=30000, duration=60)

statistics = ephys_noise_statistics_from_file(filepath,channel_indices, gain, sampling_rate = 30000, duration = 60)

Gets the noise statistics from a raw data file. It won't parse the whole file, instead it will extract 2 chunks, one from t=duration to t=duration2 and another from t=end of recording-duration2 to t=end of recording-duration. Then computes: the peak to peak, min, max, median and absolute median deviation of those chunks.

This is useful just to compare the start and end of the recording or to have ballpark estimations of these values. For more accurate measurements split the recording in chunks of e.g. 1 second, compute it for the entire file, then average and std. This will max if there are artifacts in the chunks.

Joao Couto - labdata 2024

Source code in labdata/rules/ephys.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def ephys_noise_statistics_from_file(filepath,channel_indices, gain, sampling_rate = 30000, duration = 60):
    '''
    statistics = ephys_noise_statistics_from_file(filepath,channel_indices, gain, sampling_rate = 30000, duration = 60)

    Gets the noise statistics from a raw data file. It won't parse the whole file, instead it will extract 2 chunks, one 
    from t=duration to t=duration*2 and another from t=end of recording-duration*2 to t=end of recording-duration.
    Then computes: the peak to peak, min, max, median and absolute median deviation of those chunks.

    This is useful just to compare the start and end of the recording or to have ballpark estimations of these values. 
    For more accurate measurements split the recording in chunks of e.g. 1 second, compute it for the entire file, then average and std.
    This will max if there are artifacts in the chunks. 

    Joao Couto - labdata 2024
    '''

    filepath = Path(filepath)
    if str(filepath).endswith('.cbin'):
        from mtscomp import decompress
        data = decompress(filepath) #,filepath.with_suffix('.ch'))
    elif str(filepath).endswith('.bin'):
        from spks.spikeglx_utils import load_spikeglx_binary
        data,meta = load_spikeglx_binary(filepath)
    else:
        raise ValueError(f'Could not handle extension: {filepath}')
    if data.shape[0]/sampling_rate < duration:
        duration = np.floor((data.shape[0]/sampling_rate)/2)
    # read the head and tail data
    head_data = np.array(data[int(sampling_rate*duration):int(sampling_rate*duration)*2],dtype=np.float32)*gain
    tail_data = np.array(data[-int(sampling_rate*duration)*2:-int(sampling_rate*duration)],dtype = np.float32)*gain
    dd = [head_data,tail_data]
    res = dict(channel_peak_to_peak = np.zeros((len(channel_indices),len(dd))),
                 channel_median = np.zeros((len(channel_indices),len(dd))),
                 channel_mad = np.zeros((len(channel_indices),len(dd))),
                 channel_max = np.zeros((len(channel_indices),len(dd))),
                 channel_min = np.zeros((len(channel_indices),len(dd))))
    from scipy.stats import median_abs_deviation
    for i,d in enumerate(dd):
        res['channel_mad'][:,i] = median_abs_deviation(d[:,channel_indices],axis = 0) 
        res['channel_max'][:,i] = np.max(d[:,channel_indices],axis = 0) 
        res['channel_min'][:,i] = np.min(d[:,channel_indices],axis = 0) 
        res['channel_median'][:,i] = np.median(d[:,channel_indices],axis = 0) 
        res['channel_peak_to_peak'][:,i] = res['channel_max'][:,i]-res['channel_min'][:,i]
    return res

get_probe_configuration(meta)

Meta can be a file or a dictionary. Uses spks for now to parse the metadata.

Source code in labdata/rules/ephys.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def get_probe_configuration(meta):
    '''
    Meta can be a file or a dictionary.
    Uses spks for now to parse the metadata.
    '''
    if not hasattr(meta,'keys'):
        meta = Path(meta)
        if not meta.exists():
            raise OSError(f'File not found: {meta}')
        try:
            from spks.spikeglx_utils import read_spikeglx_meta
            # TODO: consider porting a minimal version over
        except:
            raise OSError('Could not import spks: install from https://github.com/spkware/spks')
        meta = read_spikeglx_meta(meta)

    probe_type = str(int(meta['imDatPrb_type']))
    recording_software = 'spikeglx' # make this work with openephys also
    return dict(probe_id = str(int(meta['imDatPrb_sn'])),
                recording_software = recording_software,
                recording_duration = meta['fileTimeSecs'],
                sampling_rate = meta['sRateHz'],
                probe_type = probe_type,
                probe_n_shanks = 4 if probe_type in ['24','2013','2014'] else 1,
                probe_gain = meta['conversion_factor_microV'][0],
                probe_n_channels = len(meta['channel_idx']),
                channel_idx = meta['channel_idx'],
                channel_coords = meta['coords'],
                channel_shank = meta['channel_shank'],
                probe_recording_channels = int(meta['nSavedChans']-1))

insert_miniscope_dataset(schema, key, local_paths=None, skip_duplicates=False)

Source code in labdata/rules/imaging.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def insert_miniscope_dataset(schema,key, local_paths = None, skip_duplicates = False):

    if len(schema.Miniscope & key):
        from warnings import warn
        warn(f'Dataset {key} was already inserted.')
        return 

    filedict = (schema.File & (schema.Dataset.DataFiles & key) & 'file_path LIKE "%.zarr.zip"').proj().fetch1()
    datafile = (schema.File & (schema.Dataset.DataFiles & key) & 'file_path LIKE "%.zarr.zip"').get(local_paths = local_paths)[0]

    from ..utils import open_zarr
    dat = open_zarr(datafile)


    metafile = (schema.File & (schema.Dataset.DataFiles & key) & 'file_path LIKE "%metaData.json"').get(local_paths = local_paths)[0]
    tsfile = (schema.File & (schema.Dataset.DataFiles & key) & 'file_path LIKE "%timeStamps.csv"').get(local_paths = local_paths)[0]
    orifile = (schema.File & (schema.Dataset.DataFiles & key) & 'file_path LIKE "%headOrientation.csv"').get(local_paths = local_paths)
    # read the metadata
    with open(metafile,'r') as d:
        metadata = json.load(d)

    frame_rate = metadata['frameRate']
    if type(frame_rate) is str: # the frame rate changed in different versions
        frame_rate = float(metadata['frameRate'].strip('FPS'))
    gain = metadata['gain'] # the gain changed in differnet versions..
    if type(gain) is str:
        if gain == 'Low':
            gain = 1
        elif gain == 'High':
            gain = 10
        else:
            gain = None
            print('Could not read gain')
    leds = [k for k in metadata.keys() if 'led' in k]
    miniscope_dict = dict(key,
                          n_channels = len(leds),
                          n_frames = dat.shape[0],
                          width = dat.shape[1],
                          height = dat.shape[2], 
                          device = metadata['deviceType'],
                          frame_rate = float(frame_rate),
                          sensor_gain = gain,
                          power = [metadata[k] for k in leds],
                          lens_tuning = metadata['ewl'],
                          **filedict)
    # read the timestamps
    timestamps = pd.read_csv(tsfile)
    del dat

    dataseteventsdict = [dict((schema.Dataset.proj() & key).fetch1(),
                            stream_name = 'miniscope')]
    dataseteventsstreams = [dict(dataseteventsdict[0],
                                 event_name = 'clock',
                                 event_timestamps = timestamps['Time Stamp (ms)'].values/1000.,
                                 event_values = timestamps['Frame Number'].values)]
    if len(orifile):
        head = pd.read_csv(orifile[0])
        for k in ['qw','qx','qy','qz']:
            dataseteventsstreams.append(dict(dataseteventsdict[0],
                                         event_name = k,
                                         event_timestamps = head['Time Stamp (ms)'].values/1000.,
                                         event_values = head[k].values))


    schema.DatasetEvents.insert(dataseteventsdict, allow_direct_insert=True, skip_duplicates=skip_duplicates)
    schema.DatasetEvents.Digital.insert(dataseteventsstreams, allow_direct_insert=True, skip_duplicates=skip_duplicates)
    schema.Miniscope.insert1(miniscope_dict, allow_direct_insert=True, skip_duplicates = skip_duplicates)
    return 

mmap_wfield_binary(filename, mode='r', nframes=None, shape=None, dtype='uint16')

Loads frames from a binary file as a memory map. This is useful when the data does not fit to memory.

Inputs: filename (str) : fileformat convention, file ends in _NCHANNELS_H_W_DTYPE.dat mode (str) : memory map access mode (default 'r') 'r' | Open existing file for reading only. 'r+' | Open existing file for reading and writing.
nframes (int) : number of frames to read (default is None: the entire file) offset (int) : offset frame number (default 0) shape (list|tuple) : dimensions (NCHANNELS, HEIGHT, WIDTH) default is None dtype (str) : datatype (default uint16) Returns: A memory mapped array with size (NFRAMES,NCHANNELS, HEIGHT, WIDTH).

Example: dat = mmap_dat(filename)

This is from wfield - jcouto

Source code in labdata/rules/imaging.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
def mmap_wfield_binary(filename,
                       mode = 'r',
                       nframes = None,
                       shape = None,
                       dtype='uint16'):
    '''
    Loads frames from a binary file as a memory map.
    This is useful when the data does not fit to memory.

    Inputs:
        filename (str)       : fileformat convention, file ends in _NCHANNELS_H_W_DTYPE.dat
        mode (str)           : memory map access mode (default 'r')
                'r'   | Open existing file for reading only.
                'r+'  | Open existing file for reading and writing.                 
        nframes (int)        : number of frames to read (default is None: the entire file)
        offset (int)         : offset frame number (default 0)
        shape (list|tuple)   : dimensions (NCHANNELS, HEIGHT, WIDTH) default is None
        dtype (str)          : datatype (default uint16) 
    Returns:
        A memory mapped  array with size (NFRAMES,NCHANNELS, HEIGHT, WIDTH).

    Example:
        dat = mmap_dat(filename)

    This is from wfield - jcouto
    '''

    if not os.path.isfile(filename):
        raise OSError('File {0} not found.'.format(filename))
    if shape is None or dtype is None: # try to get it from the filename
        dtype,shape,_ = _parse_wfield_fname(filename,
                                            shape = shape,
                                            dtype = dtype)
    if type(dtype) is str:
        dt = np.dtype(dtype)
    else:
        dt = dtype
    if nframes is None:
        # Get the number of samples from the file size
        nframes = int(os.path.getsize(filename)/(np.prod(shape)*dt.itemsize))
    dt = np.dtype(dtype)
    return np.memmap(filename,
                     mode=mode,
                     dtype=dt,
                     shape = (int(nframes),*shape))

read_ome_tif(file)

Source code in labdata/rules/imaging.py
561
562
563
564
565
566
567
568
569
def read_ome_tif(file):
    from tifffile import TiffFile
    try:
        res = TiffFile(file).pages.get(0).asarray()
    except Exception as err:
        print(f'TIFF error for {file}')
        print(err) # raise the exception again
        raise(OSError(f'TIFF error for {file}'))
    return res

insert_widefield_dataset(schema, key, local_paths=None, skip_duplicates=False)

Insert widefield dataset keys (assumes data were collected with labcams but there can be adapted to support other formats.) ask jpcouto@gmail.com

Source code in labdata/rules/imaging.py
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
def insert_widefield_dataset(schema,key, local_paths = None, skip_duplicates = False):
    '''
    Insert widefield dataset keys (assumes data were collected with labcams but there can be adapted to support other formats.)
    ask jpcouto@gmail.com
    '''
    if len(schema.Widefield & key):
        from warnings import warn
        warn(f'Dataset {key} was already inserted.')
        return

    filedict = (schema.File & (schema.Dataset.DataFiles & key) & 'file_path LIKE "%.zarr.zip"').proj().fetch1()
    datafile = (schema.File & (schema.Dataset.DataFiles & key) & 'file_path LIKE "%.zarr.zip"').get(local_paths = local_paths)[0]
    camlogfile = (schema.File & (schema.Dataset.DataFiles & key) & 'file_path LIKE "%.camlog"').get(local_paths = local_paths)[0]

    from ..utils import open_zarr
    dat = open_zarr(datafile)

    dataseteventsdict = None
    software = 'unknown'
    try:
        from ..schema.utils import read_camlog
        comm,log = read_camlog(camlogfile)
        led = [c.split('LED:')[-1] for c in comm if c.startswith('#LED')] 
        if len(led):
            from io import StringIO
            led = pd.read_csv(StringIO('\n'.join(led)),delimiter = ',',header = None,names= ['channel','frame_id','timestamp'])
            frame_rate = np.nanmean(1000./np.diff(led.timestamp.values[::dat.shape[1]]))
            dataseteventsdict = [dict((schema.Dataset.proj() & key).fetch1(),
                                  stream_name = 'widefield')]
            dataseteventsstreams = [dict(dataseteventsdict[0],event_name = u,
                                     event_timestamps = led[led.channel.values == u].timestamp.values,
                                     event_values = led[led.channel.values == u].frame_id.values) for u in np.unique(led.channel.values)]
        else:
            print('There was no LED in the log, trying to get the frame rate from the camera log.')
            frame_rate = np.nanmean(1./np.diff(log[1].values))

        software = 'labcams'
        lv = [l.split(':')[-1].strip(' ') for l in comm if 'labcams version' in l]
        if len(lv):
            software += ' '+lv[0]

    except Exception as err:
        from warnings import warn
        warn('Could not parse the frame rate.')
        print(err)
        frame_rate = -1
    widefielddict = dict((schema.Dataset.proj() & key).fetch1(),
                         n_channels = dat.shape[1],
                         n_frames = dat.shape[0],
                         height = dat.shape[2],
                         width = dat.shape[3],
                         frame_rate = frame_rate,
                         imaging_software = software,
                         **filedict)
    del dat
    if not dataseteventsdict is None:
        schema.DatasetEvents.insert(dataseteventsdict, allow_direct_insert=True, skip_duplicates=skip_duplicates)
        schema.DatasetEvents.Digital.insert(dataseteventsstreams, allow_direct_insert=True, skip_duplicates=skip_duplicates)
    schema.Widefield.insert1(widefielddict, allow_direct_insert=True, skip_duplicates = skip_duplicates)
    return