diff --git a/SourceWrapper_Threshold_Scan.py b/SourceWrapper_Threshold_Scan.py new file mode 100644 index 0000000..85b98d8 --- /dev/null +++ b/SourceWrapper_Threshold_Scan.py @@ -0,0 +1,151 @@ +import argparse +import logging +import beam_test +import os +import numpy as np +import time +from astropix import astropixRun + + +def change_all_TuneDACs(file,TuneDAC): + with open(file, 'r') as stream: + data=stream.readlines() + + for i,line in enumerate(data): + if 'row' in line and len(line)>50: + for column in range(16): + line=line[:35+6*column]+bin(TuneDAC)[2:].zfill(3)[::-1]+line[38+6*column:] + data[i]=line + + with open(file,'w') as stream: + stream.writelines(data) + +def change_VPDAC(file,VPDAC): + with open(file, 'r') as stream: + data=stream.readlines() + + for i,line in enumerate(data): + if 'vpdac' in line: + line=line[:36]+str(VPDAC)+']\n' + data[i]=line + + with open(file,'w') as stream: + stream.writelines(data) + + +parser = argparse.ArgumentParser(description='Astropix Driver Code') +parser.add_argument('-n', '--name', default='', required=False, + help='Option to give additional name to output files upon running') + +parser.add_argument('-o', '--outdir', default='.', required=False, + help='Output Directory for all datafiles') + +parser.add_argument('-y', '--yaml', action='store', required=False, type=str, default = 'testconfig', + help = 'filepath (in config/ directory) .yml file containing chip configuration. Default: config/testconfig.yml (All pixels off)') + +parser.add_argument('-V', '--chipVer', default=2, required=False, type=int, + help='Chip version - provide an int') + +parser.add_argument('-s', '--showhits', action='store_true', + default=False, required=False, + help='Display hits in real time during data taking') + +parser.add_argument('-p', '--plotsave', action='store_true', default=False, required=False, + help='Save plots as image files. If set, will be saved in same dir as data. Default: FALSE') + +parser.add_argument('-c', '--saveascsv', action='store_true', + default=False, required=False, + help='save output files as CSV. If False, save as txt. Default: FALSE') + +parser.add_argument('-f', '--newfilter', action='store_true', + default=False, required=False, + help='Turns on filtering of strings looking for header of e0 in V4. If False, no filtering. Default: FALSE') + +parser.add_argument('-i', '--inject', action='store', default=None, type=int, nargs=2, + help = 'Turn on injection in the given row and column. Default: No injection') + +parser.add_argument('-v','--vinj', action='store', default = None, type=float, + help = 'Specify injection voltage (in mV). DEFAULT None (uses value in yml)') + +parser.add_argument('-a', '--analog', action='store', required=False, type=int, default = 0, + help = 'Turn on analog output in the given column. Default: Column 0.') + +parser.add_argument('-t', '--threshold', type = float, action='store', default=None, + help = 'Threshold voltage for digital ToT (in mV). DEFAULT value in yml OR 100mV if voltagecard not in yml') + +parser.add_argument('-E', '--errormax', action='store', type=int, default='100', + help='Maximum index errors allowed during decoding. DEFAULT 100') + +parser.add_argument('-r', '--maxruns', type=int, action='store', default=None, + help = 'Maximum number of readouts') + +parser.add_argument('-M', '--maxtime', type=float, action='store', default=None, + help = 'Maximum run time (in minutes)') + +parser.add_argument('--timeit', action="store_true", default=False, + help='Prints runtime from seeing a hit to finishing the decode to terminal') + +parser.add_argument('-L', '--loglevel', type=str, choices = ['D', 'I', 'E', 'W', 'C'], action="store", default='I', + help='Set loglevel used. Options: D - debug, I - info, E - error, W - warning, C - critical. DEFAULT: I') + +parser.add_argument +args = parser.parse_args() +logname = "./runlogs/AstropixRunlog_" + time.strftime("%Y%m%d-%H%M%S") + ".log" + +# Sets the loglevel +ll = args.loglevel +if ll == 'D': + loglevel = logging.DEBUG +elif ll == 'I': + loglevel = logging.INFO +elif ll == 'E': + loglevel = logging.ERROR +elif ll == 'W': + loglevel = logging.WARNING +elif ll == 'C': + loglevel = logging.CRITICAL + +# Logging +formatter = logging.Formatter('%(asctime)s:%(msecs)d.%(name)s.%(levelname)s:%(message)s') +fh = logging.FileHandler(logname) +fh.setFormatter(formatter) +sh = logging.StreamHandler() +sh.setFormatter(formatter) +logging.getLogger().addHandler(sh) +logging.getLogger().addHandler(fh) +logging.getLogger().setLevel(loglevel) + +logger = logging.getLogger(__name__) + +#If using v2, use injection created by injection card +#If using v3, use injection created with integrated DACs on chip +onchipBool = True if args.chipVer > 2 else False + +threshold_array=np.arange(40,241,5 ) ###### can change step here + +for VPDAC in [10,20,30,40]: +# for VPDAC in [10]: + change_VPDAC(f'config/{args.yaml}.yml',VPDAC) + for TuneDAC in [0,1,2,3,4,5,6,7]: + change_all_TuneDACs(f'config/{args.yaml}.yml', TuneDAC) + time.sleep(1) + for col in [0,1,2,3,4,5,6,7,8,9,10,11,12]: + # for col in [9]: + args.inject=[1,col] + ### outdir is currently specific to the machine used to run at goddard, change before running + args.outdir=f'E:/data/VPDAC_Testing_With_Grant/scan4/VPDAC_{VPDAC}/TuneDAC_{TuneDAC}/Col_{col}' + os.makedirs(args.outdir) + for threshold in threshold_array: + args.name=f'threshold_{threshold}mV' + print(f'{args.name}') + args.threshold=float(threshold) + success_bool=False + # beam_test.main(args) + while success_bool==False: + try: + beam_test.main(args) + success_bool=True + except: + print(f'An error occured on threshold {threshold}mV') + success_bool=False + time.sleep(1) \ No newline at end of file diff --git a/apx4_read.py b/apx4_read.py new file mode 100644 index 0000000..38dc886 --- /dev/null +++ b/apx4_read.py @@ -0,0 +1,217 @@ +# Copyright (C) 2025 the astropix team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +from astropix import astropixRun +import os +import time +import logging +import argparse + +from astropix_analysis.fmt import AstroPix4Hit, AstroPix4Readout +from astropix_analysis.fileio import FileHeader#, apx_to_csv + + + +def setup_logger(level: str, file_path: str = None): + """Setup the logger. + + .. warning:: + + This should probably be factored out in a module that all the scripts + can use, rather than coding the same thing over and over again. + + Arguments + --------- + level : str + The logging level (D, I, W, E, C) + + file_path : str (optional) + The path to the output log file. + """ + _logging_dict = {'D': logging.DEBUG, 'I': logging.INFO, 'W': logging.WARNING, + 'E': logging.ERROR, 'C': logging.CRITICAL} + + try: + level = _logging_dict[level] + except KeyError: + raise RuntimeError(f'Unrecognized logging level "{level}"') + + formatter = logging.Formatter('%(asctime)s:%(msecs)d.%(name)s.%(levelname)s:%(message)s') + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + logging.getLogger().handlers.clear() + logging.getLogger().addHandler(stream_handler) + if file_path is not None: + file_handler = logging.FileHandler(file_path) + file_handler.setFormatter(formatter) + logging.getLogger().addHandler(file_handler) + + logging.getLogger().setLevel(level) + logger = logging.getLogger(__name__) + return logger + + +def main(args): + """Configure and run an AstroPix 4 chip. + """ + # A couple of hard-coded parameters---this is currently only supporting AstroPix v4! + chip_version = 4 + ci_on_chip = True + + + # Latch the start date and time---this will be used for naming the output products. + start_datetime = time.strftime("%Y%m%d_%H%M%S") + + # Create the output folder. + output_folder = os.path.join(args.outdir, start_datetime) + os.makedirs(output_folder) + + # Setup the logger. + log_file_name = f'{start_datetime}.log' + log_file_path = os.path.join(output_folder, log_file_name) + logger = setup_logger(args.loglevel, log_file_path) + + # Setup the data acquisition. + logger.info('Configuring the chip...') + astro = astropixRun(chipversion=chip_version, inject=args.inject) + astro.asic_init(yaml=args.yaml, analog_col=args.analog) + astro.init_voltages(vthreshold=args.threshold) + + # If injection, ensure injection pixel is enabled and initialize. + if args.inject is not None: + astro.enable_pixel(args.inject[1], args.inject[0]) + astro.init_injection(inj_voltage=args.vinj, onchip=ci_on_chip) + + # Enable final configuration. + astro.enable_spi() + astro.asic_configure() + if chip_version == 4: + for row_i in range(13): + astro.update_asic_tdac_row(row_i) + logger.info('Chip fully configured!') + + # What is this doing? + astro.dump_fpga() + + if args.inject is not None: + astro.start_injection() + + # Save final configuration to output file + config_file_name = f'{args.yaml}_{start_datetime}.yml' + config_file_path = os.path.join(output_folder, config_file_name) + logger.info(f'Copying configuration to {config_file_path}...') + astro.write_conf_to_yaml(config_file_path) + + # Do we really need a second call to this? + astro.dump_fpga() + + # Setup exit conditions + if args.maxtime is not None: + stop_time = time.time() + args.maxtime * 60. + else: + stop_time = None + if args.maxruns is not None: + max_num_readouts = args.maxruns + else: + max_num_readouts = None + + # Preparation of the file header. Note we can put literally anything that is + # serializable in the header, and for the time being we are just grabbing + # anything that used to end up in the original log (i.e., data) file. + # This is an area where we might want to put some thought as to what the most + # sensible way to handle things is. + header_data = {} + header_data['configuration'] = astro.get_header_data() + header_data['args'] = args.__dict__ + header = FileHeader(AstroPix4Readout, header_data) + + # Open the output file and write the header. + data_file_name = f'{start_datetime}_data.apx' + data_file_path = os.path.join(output_folder, data_file_name) + logger.info(f'Opening binary file {data_file_path}...') + output_file = open(data_file_path, 'wb') + header.write(output_file) + + # Start the event loop. + # By enclosing the main loop in try/except we are able to capture keyboard interupts cleanly + readout_id = 0 + try: + while 1: + # Check the stop conditions. + if stop_time is not None and time.time() >= stop_time: + break + if max_num_readouts is not None and readout_id >= max_num_readouts: + break + # Go ahead and readout data. + readout_data = astro.get_readout() + if readout_data: + readout = AstroPix4Readout(readout_data, readout_id) + readout_id += 1 + if readout_id % args.prescale == 0: + print(f'{readout_id} readouts acquired, last is {readout}.') + readout.write(output_file) + + + # Ends program cleanly when a keyboard interupt is sent. + except KeyboardInterrupt: + logger.info('Keyboard interupt, exiting...') + finally: + logger.info(f'Data acquisition interrupted after {readout_id} readouts.') + output_file.close() + logger.info('Output file closed.') + + # Teardown the hardware. + if args.inject is not None: + astro.stop_injection() + astro.close_connection() + logger.info("Program terminated successfully!") + + #if args.saveascsv: + # file_path = apx_to_csv(data_file_path, AstroPix4Readout) + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser(description='Astropix 4 simple run control') + parser.add_argument('-o', '--outdir', default='.', required=False, + help='Output Directory for all data files') + parser.add_argument('-y', '--yaml', action='store', required=False, type=str, default = 'testconfig', + help = 'filepath (in config/ directory) .yml file containing chip configuration. ' + 'Default: config/testconfig.yml (All pixels off)') + parser.add_argument('-p', '--prescale', type=int, default=25, + help='Prescale factor for displaying hits in real time during data taking') + parser.add_argument('-c', '--saveascsv', action='store_true', default=False, + help='Convert output file to csv.') + parser.add_argument('-i', '--inject', action='store', default=None, type=int, nargs=2, + help = 'Turn on injection in the given row and column. Default: No injection') + parser.add_argument('-v','--vinj', action='store', default = None, type=float, + help = 'Specify injection voltage (in mV). DEFAULT None (uses value in yml)') + parser.add_argument('-a', '--analog', action='store', required=False, type=int, default = 0, + help = 'Turn on analog output in the given column. Default: Column 0.') + parser.add_argument('-t', '--threshold', type = float, action='store', default=None, + help = 'Threshold voltage for digital ToT (in mV). DEFAULT value in yml OR 100mV if voltagecard not in yml') + parser.add_argument('-r', '--maxruns', type=int, action='store', default=None, + help = 'Maximum number of readouts') + parser.add_argument('-M', '--maxtime', type=float, action='store', default=None, + help = 'Maximum run time (in minutes)') + parser.add_argument('-L', '--loglevel', type=str, choices = ['D', 'I', 'E', 'W', 'C'], action="store", default='I', + help='Set loglevel used. ' + 'Options: D - debug, I - info, E - error, W - warning, C - critical. DEFAULT: I') + + parser.add_argument + args = parser.parse_args() + + main(args) diff --git a/astropix.py b/astropix.py index 866fbf4..e83a2a8 100644 --- a/astropix.py +++ b/astropix.py @@ -367,6 +367,31 @@ def hits_present(self): return True else: return False + + def get_header_data(self): + """Return a dictionary with all the data for the data file header. + """ + vdacs=['thpmos', 'cardConf2','vcasc2', 'BL', 'cardConf5', 'cardConf6','vminuspix','thpix'] + vcardconfig = {} + for i,v in enumerate(vdacs): + vcardconfig[v] = self.vcard_vdac[i] + digitalconfig = {} + for key in self.asic.asic_config['digitalconfig']: + digitalconfig[key]=self.asic.asic_config['digitalconfig'][key][1] + biasconfig = {} + for key in self.asic.asic_config['biasconfig']: + biasconfig[key]=self.asic.asic_config['biasconfig'][key][1] + idacconfig = {} + for key in self.asic.asic_config['idacs']: + idacconfig[key]=self.asic.asic_config['idacs'][key][1] + if self.chipversion>2: + vdacconfig = {} + for key in self.asic.asic_config['vdacs']: + vdacconfig[key]=self.asic.asic_config['vdacs'][key][1] + arrayconfig = {} + for key in self.asic.asic_config['recconfig']: + arrayconfig[key]=self.asic.asic_config['recconfig'][key][1] + return (vcardconfig, digitalconfig, biasconfig, idacconfig, vdacconfig, arrayconfig) def get_log_header(self): """