Source code for seek.calibration.flux_calibration_transit

# SEEK is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# SEEK is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with SEEK.  If not, see <http://www.gnu.org/licenses/>.


'''
Created on Feb 4, 2016

author: cchang
'''

from __future__ import print_function, division, absolute_import, unicode_literals

import numpy as np

import ephem
from datetime import timedelta
from scipy import optimize
from collections import namedtuple

from seek.plugins import load_data
from seek.utils import parse_datetime
from seek.mitigation import sum_threshold                
from seek.calibration import astro_calibration_source
from seek.calibration import fitting
from seek.utils import sphere
from seek.utils import load_file

# define constants
HOUR = timedelta(1.0/24.)
MINUTE = timedelta(1.0/24./60)
k = 1.38e-23
c = 3e8

CalibrationSource = namedtuple("CalibrationSource", ["date", "azimut", "elevation", "target"])
GaussFitResult = namedtuple("GaussFitResult", ["gauss_A", "gauss_x0", "gauss_sigma", "sky1", "sky2", 
                                               "gauss_A_err", "gauss_x0_err", "gauss_sigma_err", "sky1_err", "sky2_err", 
                                               "rsquared"])

def _find_file_list(datetime, cal_paths, params):
    """
    Given a datetime, find files that corresponding to two hours from this datetime.
    This is used for calibration only.

    :param datetime: datetime of interest (transit time)
    :param cal_paths: path of calibration files
    :param params: context parameters
    :return: list of files
    """

    file_list_temp = []
    for path in cal_paths:
        file_datetime = load_data.get_observation_start(path, params.file_type)
        
        if (datetime-15*MINUTE) <= file_datetime < (datetime+2*HOUR):
            file_list_temp.append(path)
            
    return file_list_temp

def _get_source_dec(source, params):
    """
    Get the declination coordinate of a given source in degrees.

    :param source: name of source
    :param params: context parameters

    :return: declination coordinate of source in degrees
    """
    if source.target == 'Sun':
        obs = ephem.Observer()
        obs.lon = params.telescope_longitude
        obs.lat = params.telescope_latitude
        obs.elevation = params.telescope_elevation
        obs.pressure = 0
        obs.date = source.date + HOUR
        s = ephem.Sun(obs)
        source_dec = s.dec/np.pi*180.0

    else:
        time = source.date + timedelta(seconds=3600)
        ra, dec = sphere.altaz_to_ra_dec(time, source.azimut, source.elevation, params=params)
        source_dec = np.degrees(dec)
        
    return source_dec

[docs]def fit_gaussian_source(data, time_axis, rfi_mask, source_dec, ctx): """ Given a chunck of 2D data, fit a 1D Gaussian to each frequency. Here only return the amplitude. :param data: data :param time_axis: time axis :param rfi_mask: mask :param source_dec: declination coordinate of source :param ctx: context :return: amplitude of Gaussian fit """ return fit_gaussian_source_full(data, time_axis, rfi_mask, source_dec, ctx)[0]
[docs]def fit_gaussian_source_full(data, time_axis, rfi_mask, source_dec, params): """ Given a chunck of 2D data, fit a 1D Gaussian to each frequency. Here return all fitted parameters. :param data: data :param time_axis: time axis :param rfi_mask: mask :param source_dec: declination coordinate of source :param params: context parameters :return: all Gaussian fit parameters """ no_params = 5 fit_results = np.zeros((data.shape[0], 2*no_params+1)) deg = np.arange(data.shape[1]) * (time_axis[1] - time_axis[0]) * 60 * 60 * (360. / 24 / 60 / 60) * params.integration_time * np.cos(source_dec / 180. * np.pi) # deg for k in range(data.shape[0]): ratio = np.sum(rfi_mask[k]) / rfi_mask.shape[1] if ratio<0.5: # hard code this? x = deg[rfi_mask[k]==False] y = data[k][rfi_mask[k]==False] if params.calibration_sources=='CygA': fit, perr, rsquared = fitting.fit_func(x, y, 'gauss2', [params.calibration_chi1, np.mean(deg), 1.0, params.calibration_chi1, np.mean(deg)+5.0, 1.5, 0.0, 1e10]) fit_results[k, 0] = fit[0] else: fit, perr, rsquared = fitting.fit_func(x, y, 'gauss', [params.calibration_chi1, np.mean(deg), 1.0, 0.0, 1e10]) fit_results[k, :no_params] = fit fit_results[k, no_params:2*no_params] = perr fit_results[k, -1] = rsquared return GaussFitResult(*[fit_results[:, i] for i in range(fit_results.shape[1])])
# def _plot_fit(x, y, func, fit, rsquared, ax): # ax.plot(x, y, label="R^2: %4.2f"%rsquared) # ax.plot(x, func(x, *fit), lw=2.0, color="k") # ax.legend() def _sun_model_func(a, freq, data, gain): """ Define module for fitting sun template to gain files """ return np.sum((data - (gain * a[0] + (freq - a[2]) * a[1]))**2)
[docs]def fit_sun_to_gain(gauss_amp, freq, params): """ Takes the array of Gaussian fit parameters and fits the sun template to it. Then calculate the actual gain that converts the map ADU to K. """ gauss_median = np.ma.median(gauss_amp, axis=0).data ref = astro_calibration_source.source(params.calibration_sources, freq*1e6) Gain_median = ref/gauss_median Gain_median = np.nan_to_num(Gain_median) # outlier rejection -- need further generalization! mm = (Gain_median!=0) med = np.median(Gain_median[mm]) std = np.std(Gain_median[mm]) # this is to say that the gain should not vary drastically over frequency! Gain_median[(np.abs(Gain_median/med) > 3)|(np.abs(med/Gain_median) > 3)] = 0.0 # fit measurement with sun spectra gain_sun = load_file(params.gain_file_sun, skip_header=True) sun_freq = gain_sun[:,0] sun_gain = gain_sun[:,1] sun_Ae = gain_sun[:,3] # sun_sigma = gain_sun[:,2]*(2*np.sqrt(2*np.log(2)))*(1.22/1.028)/2 # sigma ==> FWHM ==> FNBW # interpolate onto frequency grid sun_gain_interp = np.interp(freq, sun_freq, sun_gain) sun_Ae_interp = np.interp(freq, sun_freq, sun_Ae) # adjust this cut when doing the templates mask = (Gain_median!=0) * (freq>1000) * (freq<1280) cal_freq = freq[mask] cal = Gain_median[mask] opt = optimize.minimize(_sun_model_func, (350.0, 0.0001, 1120), args=(cal_freq, cal, sun_gain_interp[mask]), method='Powell') cal_gain = sun_gain_interp * opt['x'][0] + (freq - opt['x'][2]) * opt['x'][1] # lam = c / (freq * 1e6) # map_to_k = (lam**2/(np.pi*(sun_sigma_interp/180*np.pi)**2)*cal_gain/2./k*1e-26)**(-1) map_to_k = (sun_Ae_interp * cal_gain / 2. / k * 1e-26)**(-1) # print(opt) # here's to see by eye weather the fit is doing what it's suppose to do... # import pylab as mplot # import seaborn as sns # sns.set_style("whitegrid") # sns.set_context("talk") # mplot.figure(figsize=(8,6)) # mplot.scatter(cal_freq, cal, c='r') # mplot.plot(freq, cal_gain, color='k') # mplot.ylim(0, 7e-7) # mplot.title(str(params.calibration_sources)+'; N='+str(len(Gauss))) # mplot.grid(True) # mplot.savefig(str(params.strategy_start)+'_'+str(params.calibration_sources)+'.png') # mplot.show() return np.vstack((freq, map_to_k)).T, freq[mask], (sun_Ae_interp[mask]*Gain_median[mask]/2./k*1e-26)**(-1)
def _parse_source(source_entry, cal_date): return CalibrationSource(parse_datetime(cal_date + '-' + source_entry[0]), np.radians(source_entry[1]), np.radians(source_entry[2]), source_entry[3].split(':')[1].strip() )
[docs]def calibrate(ctx, plot=False): """ Main function that performs the calibration, including the following steps: * loop through the calibration sources -- find the corresponding list of files -- RFI removal -- for each frequency fit 1D Gaussian to data -- divide amplitude by reference spectra -- store the final pseudo-gain value * take median pseudo-gain over all calibration sources * fit this final spectra with a template derived from Sun * store this final fit to memory :param ctx: context that contains all parameters for RFI removal and file paths :return: 2D array with first column frequency and second gain factor """ frequencies, sun_fits = process_calibration_transits(ctx.calibrations_paths, ctx) sun_fit_means = np.mean(np.vstack(sun_fits), axis=0) return np.vstack((frequencies, sun_fit_means)).T
[docs]def process_calibration_transits(calibration_paths, ctx): """ Derive gain as a function of frequency for each transit measurement. :param calibration_paths: path for calibration files :param ctx: context :return: gain as a function of frequency """ params = ctx.params sm_kwargs, di_kwargs = sum_threshold.get_sumthreshold_kwargs(params) sun_fits = [] source_cnt = 0 file_path = calibration_paths.values()[0][1] frequencies = load_data.load_tod(file_path, ctx).frequencies gain_file = load_file(ctx.params.gain_file_default) gain_template = np.interp(frequencies, gain_file[:, 0], gain_file[:, 1]).reshape(-1, 1) for cal_date, cal_paths in calibration_paths.items(): gauss_amps = [] # read in calibration files sources = np.genfromtxt(cal_paths[0], comments='#', delimiter=',', dtype=None) for source_entry in sources: source = _parse_source(source_entry, cal_date) # select only sources we want to use if source.target == params.calibration_sources: fit_result = process_source(source, cal_paths[1], gain_template, sm_kwargs, di_kwargs, ctx) if fit_result is not None: gauss_amps.append(fit_result.gauss_A) source_cnt += 1 gauss_amps = np.array(gauss_amps) gauss_amps = np.ma.array(gauss_amps, mask=gauss_amps==0.0) sun_fit = fit_sun_to_gain(gauss_amps, frequencies, params)[0] sun_fits.append(sun_fit[:, 1]) print('total calibration sources:', source_cnt) return frequencies, sun_fits
[docs]def process_source(source, file_paths, gain_template, sm_kwargs, di_kwargs, ctx): """ Process a single transit measurement. :param source: name of source :param file_paths: file path :param gain_template: template derived from the Sun :param sm_kwargs: SumThreshold parameters :param di_kwargs: SumThreshold parameters :param ctx: context :return: parameters for Gaussian fit """ # load data print('processing: ' + source.target, source.date) params = ctx.params file_list = _find_file_list(source.date, file_paths, params) if len(file_list) == 0: return None tod = load_data.load_tod(file_list, ctx) # cut further on time axis data_temp = tod.vx.copy() mask_temp = np.arange(len(tod.time_axis)) mask_temp = mask_temp[(tod.time_axis >= source.date.hour)*(tod.time_axis < source.date.hour+2)] data = data_temp[:,mask_temp] time_axis = tod.time_axis[mask_temp] # RFI removal rfi_mask = sum_threshold.get_rfi_mask(data/gain_template, chi_1=100000, #TODO: make configurable plotting=False, sm_kwargs=sm_kwargs, di_kwargs=di_kwargs) # fit Gaussian (everything, this is to get the sun size measurement) return fit_gaussian_source_full(data, time_axis, rfi_mask, _get_source_dec(source, params), params)