1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
|
# This files defines a class for metadata objects of sentinel images. Large part of the work depends on python readers
# from the tops toolbox.
from doris.doris_stack.functions.xml_query import xml_query
import warnings
import os
from doris.doris_stack.main_code.burst import BurstMeta
from doris.doris_stack.functions.swath_metadata import burst_coverage, swath_coverage, swath_precise
class SwathMeta(object):
# Class which stores and gathers information of a swath in a sentinel dataset
def __init__(self, path='', swath_no='1', pol='vv', xml='', data=''):
# Initialize object variables
# This will contain a list of burst objects
self.bursts = []
# The following contain the path of xml and data file. Also swath_no and polarisation are given.
self.swath_xml = ''
self.swath_data = ''
self.swath_no = ''
self.swath_pol = ''
# These variables contain the metadata and the convex hull of the bursts
self.metadata = []
self.coverage = []
self.burst_centers = []
self.burst_corners = []
self.burst_shapes = []
self.orbits = []
self.orbit_type = ''
# This function creates an swath object and searches for available data and xml files. It gives an error when
# either the path does not exist, no data or xml files can be found or the data and xml files do not match.'
if (not xml or not data) and not path:
warnings.warn('Please provide either a product path or xml and data path')
return
if not xml or not data:
warnings.warn('Data path is now used')
xml_dir = os.path.join(path, 'annotation')
xml = [f for f in os.listdir(xml_dir) if os.path.isfile(os.path.join(xml_dir, f))]
data_dir = os.path.join(path, 'measurement')
data = [f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f))]
# Select polarisation
if not any(s in pol for s in ('hh','vv','hv','vh')):
warnings.warn('Polarisation not recognized, using default (vv)')
pol = 'vv'
if not swath_no in ('1','2','3'):
warnings.warn('Swath number not recognized, using default (1)')
xml = [os.path.join(path,'annotation',x) for x in xml if x[12:14] in pol and x[6] == swath_no]
data = [os.path.join(path,'measurement',x) for x in data if x[12:14] in pol and x[6] == swath_no]
# Check if the data is there and if the filenames coincide.
# print(xml + str(len(xml)))
# print(data + str(len(data)))
if type(xml) is str:
xml = [xml]
if type(data) is str:
data = [data]
if (len(xml) != 1 and type(xml) is list) or len(data) != 1:
warnings.warn('Total number of files should be one!')
if not os.path.exists(xml[0]) or not os.path.exists(data[0]):
warnings.warn('Either xml or data path does not exist')
if xml[0:-3] != data[0:-4]:
warnings.warn('xml and data file do not correspond.')
self.swath_xml = xml[0]
self.swath_data = data[0]
def meta_swath(self):
# This function reads and stores metadata of different swaths in the swath objects.
self.metadata = xml_query(self.swath_xml)
corners, self.coverage = swath_coverage(self.metadata)
self.burst_centers, self.burst_corners, self.burst_shapes = burst_coverage(self.metadata)
def orbits_swath(self, precise_folder=''):
# This functions loads the precise orbits for this swath
if not precise_folder:
print('xml information on orbit is used because no precise folder is specified')
orbits, type_orb = swath_precise(self.metadata, precise_folder=precise_folder, dat_type='XML')
else:
orbits, type_orb = swath_precise(self.metadata, precise_folder=precise_folder, dat_type='POE')
self.orbits = orbits
self.orbit_type = type_orb
return orbits, type_orb
def meta_burst(self, precise_folder=''):
# This function reads and stores metadata of different bursts in the bursts objects.
if not self.metadata:
self.meta_swath()
if not self.orbits:
self.orbits_swath(precise_folder=precise_folder)
bursts_num = len(self.metadata['aux']['azimuthTimeStart'])
if self.bursts:
self.bursts = []
for no in range(bursts_num):
self.bursts.append(BurstMeta(path='',swath_no=self.swath_no, pol=self.swath_pol, burst_num=no + 1,
xml=self.swath_xml, data=self.swath_data))
self.bursts[no].burst_center = self.burst_centers[no][0]
self.bursts[no].burst_coverage = self.burst_shapes[no]
self.bursts[no].burst_corners = self.burst_corners[no]
self.bursts[no].datapoints = self.orbits
self.bursts[no].orbit_type = self.orbit_type
|