1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
|
from __future__ import with_statement
import sys
try:
import numpy
import reflex
from pipeline_product import PipelineProduct
import pipeline_display
import reflex_plot_widgets
import matplotlib.gridspec as gridspec
import_success = True
except ImportError:
import_success = False
print "Error importing modules pyfits, wx, matplotlib, numpy"
def paragraph(text, width=None):
""" wrap text string into paragraph
text: text to format, removes leading space and newlines
width: if not None, wraps text, not recommended for tooltips as
they are wrapped by wxWidgets by default
"""
import textwrap
if width is None:
return textwrap.dedent(text).replace('\n', ' ').strip()
else:
return textwrap.fill(textwrap.dedent(text), width=width)
class DataPlotterManager(object):
# static members
recipe_name = "kmos_wave_cal"
det_img_wave_cat = "DET_IMG_WAVE"
lcal_cat = "LCAL"
def setWindowTitle(self):
return self.recipe_name+"_interactive"
def setInteractiveParameters(self):
return [
reflex.RecipeParameter(recipe=self.recipe_name, displayName="order",
group="Wavelength Calibration", description="The wavelength polynomial order [0,7]"),
reflex.RecipeParameter(recipe=self.recipe_name, displayName="dev_flip",
group="Wavelength Calibration", description="TRUE if the wavelengths are ascending on the detector from top to bottom"),
reflex.RecipeParameter(recipe=self.recipe_name, displayName="dev_disp",
group="Wavelength Calibration", description="The expected dispersion of the wavelength in microns/pixel"),
reflex.RecipeParameter(recipe=self.recipe_name, displayName="suppress_extension",
group="Wavelength Calibration", description="Suppress arbitrary filename extension"),
]
def readFitsData(self, fitsFiles):
self.frames = dict()
for f in fitsFiles:
self.frames[f.category] = PipelineProduct(f)
# Two cases: the file category is found or not found.
# Define the plotting functions in both cases
if self.det_img_wave_cat in self.frames:
# Get the wished files
det_img_wave = self.frames[self.det_img_wave_cat]
self.det_img_wave_name = det_img_wave.fits_file.name
# Get the angle values
self.angles_list = dict()
for i in range(18):
if (i+1 < len(det_img_wave.all_hdu)):
angle = det_img_wave.readKeyword('ESO PRO ROT NAANGLE', i+1)
if not (angle in self.angles_list) :
self.angles_list[angle] = []
self.angles_list[angle].append(i+1)
# Sorted angles list
self.sorted_angles = self.angles_list.keys()
self.sorted_angles.sort()
# Read the Plotting Data
self.argon_pos_data = []
self.argon_fwhm_data = []
self.neon_pos_data = []
self.neon_fwhm_data = []
for i in range(18):
if (i+1 < len(det_img_wave.all_hdu)):
key1 = det_img_wave.readKeyword('ESO QC ARC AR POS MEAN', i+1)
key2 = det_img_wave.readKeyword('ESO QC ARC AR FWHM MEAN', i+1)
ar_vscale = det_img_wave.readKeyword('ESO QC ARC AR VSCALE', i+1)
key3 = det_img_wave.readKeyword('ESO QC ARC NE POS MEAN', i+1)
key4 = det_img_wave.readKeyword('ESO QC ARC NE FWHM MEAN', i+1)
ne_vscale = det_img_wave.readKeyword('ESO QC ARC NE VSCALE', i+1)
# Correct Key2 and Key4 units
key2 /= ar_vscale
key4 /= ne_vscale
# Fill the Argon Data for the scatter plot
self.argon_pos_data.append(key1)
self.argon_fwhm_data.append(key2)
# Fill the Neon Data for the scatter plot
self.neon_pos_data.append(key3)
self.neon_fwhm_data.append(key4)
# Read the images
self.images = []
for i in range(18):
if (i+1 < len(det_img_wave.all_hdu)):
det_img_wave.readImage(i+1)
self.images.append(det_img_wave.image)
# Set the plotting functions
self._add_subplots = self._add_subplots
self._plot = self._data_plot
else:
# Set the plotting functions to NODATA ones
self._add_subplots = self._add_nodata_subplots
self._plot = self._nodata_plot
def addSubplots(self, figure):
self._add_subplots(figure)
def plotProductsGraphics(self):
self._plot()
def plotWidgets(self) :
widgets = list()
# Radio button
self.radiobutton = reflex_plot_widgets.InteractiveRadioButtons(self.axradiobutton, self.setRadioCallback, self.sorted_angles, 0, title='Angle selection')
widgets.append(self.radiobutton)
return widgets
def setRadioCallback(self, label) :
# Get Extensions as list
extensions = self.angles_list[float(label)]
if (len(extensions) == 3):
# Setup the image display
for i in range(3):
imgdisp = pipeline_display.ImageDisplay()
imgdisp.setAspect('equal')
imgdisp.display(self.img_plot[i], "Extension {0}".format(extensions[i]), self._data_plot_get_tooltip(extensions[i]), self.images[extensions[i]-1])
else:
print "Wrong number of extensions for this angle"
def _add_subplots(self, figure):
gs = gridspec.GridSpec(5, 3)
self.img_plot = []
for i in range(3):
self.img_plot.append(figure.add_subplot(gs[0:2,i]))
self.axradiobutton = figure.add_subplot(gs[3:,0])
self.argon_plot = figure.add_subplot(gs[3,1:])
self.neon_plot = figure.add_subplot(gs[4,1:])
def _data_plot_get_tooltip(self, extension):
# Create the tooltip
tooltip = " \
ESO QC ARC AR POS MEAN : %f \n \
ESO QC ARC AR FWHM MEAN : %f \n \
ESO QC ARC NE POS MEAN : %f \n \
ESO QC ARC NE FWHM MEAN : %f \
" % (self.argon_pos_data[extension-1],self.argon_fwhm_data[extension-1],self.neon_pos_data[extension-1],self.neon_fwhm_data[extension-1])
return tooltip
def _data_plot(self):
# Get Extensions as list
extensions = self.angles_list[self.sorted_angles[0]]
if (len(extensions) == 3):
# Setup the image display
for i in range(3):
imgdisp = pipeline_display.ImageDisplay()
imgdisp.setAspect('equal')
imgdisp.display(self.img_plot[i], "Extension {0}".format(extensions[i]), self._data_plot_get_tooltip(extensions[i]), self.images[extensions[i]-1])
else:
print "Wrong number of extensions for this angle"
# Define x
size = len(self.argon_pos_data)
x = numpy.linspace(1, size, num=size)
# Plot Argon and Neon plots
scadsp = pipeline_display.ScatterDisplay()
scadsp.display(self.argon_plot, "Argon Lines", "Mean Argon line positional offset, with error bars equal to the FWH", x, self.argon_pos_data,
self.argon_fwhm_data)
scadsp = pipeline_display.ScatterDisplay()
scadsp.setLabels('File Extensions 1 -> 18','Line Pos. (pix)')
scadsp.display(self.neon_plot, "Neon Lines", "Mean Neon line positional offset, with error bars equal to the FWH", x, self.neon_pos_data,
self.neon_fwhm_data)
def _add_nodata_subplots(self, figure):
self.img_plot = figure.add_subplot(1,1,1)
def _nodata_plot(self):
# could be moved to reflex library?
self.img_plot.set_axis_off()
text_nodata = "Data not found. Input files should contain this" \
" type:\n%s" % self.det_img_wave_cat
self.img_plot.text(0.1, 0.6, text_nodata, color='#11557c',
fontsize=18, ha='left', va='center', alpha=1.0)
self.img_plot.tooltip = 'No data found'
#This is the 'main' function
if __name__ == '__main__':
from reflex_interactive_app import PipelineInteractiveApp
# Create interactive application
interactive_app = PipelineInteractiveApp(enable_init_sop=True)
# get inputs from the command line
interactive_app.parse_args()
#Check if import failed or not
if not import_success:
interactive_app.setEnableGUI(False)
#Open the interactive window if enabled
if interactive_app.isGUIEnabled():
#Get the specific functions for this window
dataPlotManager = DataPlotterManager()
interactive_app.setPlotManager(dataPlotManager)
interactive_app.showGUI()
else:
interactive_app.set_continue_mode()
#Print outputs. This is parsed by the Reflex python actor to
#get the results. Do not remove
interactive_app.print_outputs()
sys.exit()
|