Skip to content

API Reference

Massipipe: A Python package for batch processing hyperspectral images.

MassiPipe is a data processing pipeline for hyperspectral images. It focuses on conversion to radiance, conversion to reflectance, and glint correction of images of shallow water environments.

The package was developed during a project which used a Resonon Pika-L hyperspectral camera, and parts of the package (radiance and irradiance conversion) are specific to this camera. However, other parts are more general and can be used with any hyperspectral image.

Config

Bases: BaseModel

Top-level configuration including standard SeaBee fields

Source code in massipipe/config.py
class Config(BaseModel):
    """Top-level configuration including standard SeaBee fields"""

    grouping: str = "grouping_name"
    area: str = "area_name"
    datetime: str = "197001010000"
    nfiles: PositiveInt = 1
    organisation: str = "organization_name"
    project: str = "project_name"
    creator_name: str = "creator_name"
    mosaic: bool = False
    classify: bool = False
    theme: str = "Habitat"
    spectrum_type: Literal["RGB", "MSI", "HSI"] = "HSI"
    massipipe_options: MassipipeOptions = MassipipeOptions()

    # Validation of datetime string
    # Note that string is not converted to datetime object,
    # this is to keep original string for template generation.
    @field_validator("datetime", mode="before")
    @classmethod
    def datetime_validate(cls, datetime_str: str):
        if len(datetime_str) == 8:  # Expect YYYYmmdd
            try:
                _ = datetime.strptime(datetime_str, "%Y%m%d")
            except ValueError:
                raise
        elif len(datetime_str) == 12:  # Expect YYYYmmddHHMM
            try:
                _ = datetime.strptime(datetime_str, "%Y%m%d%H%M")
            except ValueError:
                raise
        else:
            raise ValueError(f"Invalid datetime string, use YYYYmmdd or YYYYmmddHHMM")
        return datetime_str

ImuDataParser

ImuDataParser is a utility class for parsing and processing IMU (Inertial Measurement Unit) data files generated by Resonon Airborne Hyperspectral imagers. It provides methods to read location files (.lcf), image line timestamps (.times) files, and interpolate the IMU data to match the image timestamps.

Source code in massipipe/georeferencing.py
class ImuDataParser:
    """
    ImuDataParser is a utility class for parsing and processing IMU (Inertial
    Measurement Unit) data files generated by Resonon Airborne Hyperspectral imagers. It
    provides methods to read location files (.lcf), image line timestamps (.times)
    files, and interpolate the IMU data to match the image timestamps.
    """

    @staticmethod
    def read_lcf_file(
        lcf_file_path: Union[str, Path],
        convert_to_unix_time: bool = True,
        time_rel_to_file_start: bool = False,
    ) -> Dict[str, NDArray]:
        """Read location files (.lcf) generated by Resonon Airborne Hyperspectral imager

        Parameters
        ----------
        lcf_file_path: Union[str, Path]
            Path to lcf file. Usually a "sidecar" file to an hyperspectral image with
            same "base" filename.
        convert_to_unix_time: bool, default True
            Whether to convert timestamps to UNIX time, which is standard for many
            computer / sensor systems. Timestamps in LCF file are originally in "GPS
            time" (see
            https://en.wikipedia.org/wiki/Global_Positioning_System#Timekeeping)
        time_rel_to_file_start: bool, default False
            Boolean indicating if first timestamp should be subtracted from each
            timestamp, making time relative to start of file. If True, the value of
            convert_to_unix_time as no effect.

        Returns
        -------
        lcf_data: Dict[str, NDArray]
            Dictionary with keys describing the type of data, and data formatted as
            numpy arrays. All arrays have equal length. The 7 types of data (keys) are:
                'time': System time in seconds (GPS time).
                'roll': Roll angle in radians, positive for "right wing up".
                'pitch': Pitch angle in radians,positive nose up.
                'yaw': (heading) in radians, zero at due North, PI/2 at due East.
                'longitude': Longitude in decimal degrees, negative for west longitude.
                'latitude': Latitude in decimal degrees, negative for southern
                hemisphere.
                'altitude': Altitude in meters relative to the WGS-84 ellipsiod.

        Notes
        -----
        The LCF file format was shared by Casey Smith at Resonon on February 16. 2021.
        The LCF specification was inherited from Space Computer Corp.
        """
        try:
            lcf_raw = np.loadtxt(lcf_file_path)
        except Exception as e:
            logger.error(f"Error reading LCF file {lcf_file_path}: {e}")
            raise
        column_headers = [
            "time",
            "roll",
            "pitch",
            "yaw",
            "longitude",
            "latitude",
            "altitude",
        ]
        lcf_data = {header: lcf_raw[:, i] for i, header in enumerate(column_headers)}

        if convert_to_unix_time:
            lcf_data["time"] += mpu.gps2unix(lcf_data["time"][0]) - lcf_data["time"][0]

        if time_rel_to_file_start:
            lcf_data["time"] -= lcf_data["time"][0]

        return lcf_data

    @staticmethod
    def read_times_file(
        times_file_path: Union[Path, str], time_rel_to_file_start: bool = True
    ) -> NDArray:
        """Read image line timestamps (.times) file generated by Resonon camera

        Parameters
        ----------
        times_file_path: Union[Path,str]
            Path to times file. Usually a "sidecar" file to an hyperspectral image with
            same "base" filename.
        time_rel_to_file_start: bool, default True
            Boolean indicating if times should be offset so that first timestamp is
            zero. If not, the original timestamp value is returned.

        Returns
        -------
        times: NDArray
            Numpy array containing timestamps for every line of the corresponding
            hyperspectral image. The timestamps are in units of seconds, and are
            relative to when the system started (values are usually within the 0-10000
            second range). If time_rel_to_file_start=True, the times are offset so that
            the first timestamp is zero. The first timestamp of the times file and the
            first timestamp of the corresponding lcf file (GPS/IMU data) are assumed to
            the recorded at exactly the same time. If both sets of timestamps are offset
            so that time is measured relative to the start of the file, the times can be
            used to calculate interpolated GPS/IMU values for each image line.
        """
        try:
            image_times = np.loadtxt(times_file_path)
        except Exception as e:
            logger.error(f"Error reading times file {times_file_path}: {e}")
            raise
        if time_rel_to_file_start:
            image_times = image_times - image_times[0]
        return image_times

    @staticmethod
    def interpolate_lcf_to_times(
        lcf_data: Dict[str, NDArray], image_times: NDArray, convert_to_list: bool = True
    ) -> Dict[str, Union[List, NDArray]]:
        """Interpolate LCF data to image line times

        Parameters
        ----------
        lcf_data : dict
            Dictionary with data read from *.lcf file
        image_times : NDArray
            Array of time stamps from *.times file
        convert_to_list : bool, default True
            Whether to convert interpolated LCF data to list (useful for saving data as
            JSON). If False, interpolated values in lcf_data_interp are formatted as
            NDArray.

        Returns
        -------
        lcf_data_interp: dict[list | NDArray]
            Version of lcf_data with all measured values interpolated to image
            timestamps.
        """
        lcf_data_interp = {}
        lcf_times = lcf_data["time"]

        # LCF times and image times are both measured in seconds, but with different
        # offsets. "Sync" image times so they correspond to LCF time offset:
        image_times = image_times - image_times[0] + lcf_times[0]

        for lcf_key, lcf_value in lcf_data.items():
            lcf_data_interp[lcf_key] = np.interp(image_times, lcf_times, lcf_value)
            if convert_to_list:
                lcf_data_interp[lcf_key] = lcf_data_interp[lcf_key].tolist()
        return lcf_data_interp

    def read_and_save_imu_data(
        self,
        lcf_path: Union[Path, str],
        times_path: Union[Path, str],
        json_path: Union[Path, str],
    ) -> None:
        """Parse *.lcf and *.times files and save as JSON

        Parameters
        ----------
        lcf_path : Union[Path, str]
            Path to *.lcf (IMU data) file
        times_path : Union[Path, str]
            Path to *.times (image line timestamps) file
        json_path : Union[Path, str]
            Path to output JSON file
        """
        try:
            lcf_data = self.read_lcf_file(lcf_path)
            times_data = self.read_times_file(times_path)
            lcf_data_interp = self.interpolate_lcf_to_times(lcf_data, times_data)

            with open(json_path, "w", encoding="utf-8") as write_file:
                json.dump(lcf_data_interp, write_file, ensure_ascii=False, indent=4)
        except Exception as e:
            logger.error(f"Error processing IMU data: {e}")
            raise

read_lcf_file(lcf_file_path, convert_to_unix_time=True, time_rel_to_file_start=False) staticmethod

Read location files (.lcf) generated by Resonon Airborne Hyperspectral imager

Parameters:

Name Type Description Default
lcf_file_path Union[str, Path]

Path to lcf file. Usually a "sidecar" file to an hyperspectral image with same "base" filename.

required
convert_to_unix_time bool

Whether to convert timestamps to UNIX time, which is standard for many computer / sensor systems. Timestamps in LCF file are originally in "GPS time" (see https://en.wikipedia.org/wiki/Global_Positioning_System#Timekeeping)

True
time_rel_to_file_start bool

Boolean indicating if first timestamp should be subtracted from each timestamp, making time relative to start of file. If True, the value of convert_to_unix_time as no effect.

False

Returns:

Name Type Description
lcf_data Dict[str, NDArray]

Dictionary with keys describing the type of data, and data formatted as numpy arrays. All arrays have equal length. The 7 types of data (keys) are: 'time': System time in seconds (GPS time). 'roll': Roll angle in radians, positive for "right wing up". 'pitch': Pitch angle in radians,positive nose up. 'yaw': (heading) in radians, zero at due North, PI/2 at due East. 'longitude': Longitude in decimal degrees, negative for west longitude. 'latitude': Latitude in decimal degrees, negative for southern hemisphere. 'altitude': Altitude in meters relative to the WGS-84 ellipsiod.

Notes

The LCF file format was shared by Casey Smith at Resonon on February 16. 2021. The LCF specification was inherited from Space Computer Corp.

Source code in massipipe/georeferencing.py
@staticmethod
def read_lcf_file(
    lcf_file_path: Union[str, Path],
    convert_to_unix_time: bool = True,
    time_rel_to_file_start: bool = False,
) -> Dict[str, NDArray]:
    """Read location files (.lcf) generated by Resonon Airborne Hyperspectral imager

    Parameters
    ----------
    lcf_file_path: Union[str, Path]
        Path to lcf file. Usually a "sidecar" file to an hyperspectral image with
        same "base" filename.
    convert_to_unix_time: bool, default True
        Whether to convert timestamps to UNIX time, which is standard for many
        computer / sensor systems. Timestamps in LCF file are originally in "GPS
        time" (see
        https://en.wikipedia.org/wiki/Global_Positioning_System#Timekeeping)
    time_rel_to_file_start: bool, default False
        Boolean indicating if first timestamp should be subtracted from each
        timestamp, making time relative to start of file. If True, the value of
        convert_to_unix_time as no effect.

    Returns
    -------
    lcf_data: Dict[str, NDArray]
        Dictionary with keys describing the type of data, and data formatted as
        numpy arrays. All arrays have equal length. The 7 types of data (keys) are:
            'time': System time in seconds (GPS time).
            'roll': Roll angle in radians, positive for "right wing up".
            'pitch': Pitch angle in radians,positive nose up.
            'yaw': (heading) in radians, zero at due North, PI/2 at due East.
            'longitude': Longitude in decimal degrees, negative for west longitude.
            'latitude': Latitude in decimal degrees, negative for southern
            hemisphere.
            'altitude': Altitude in meters relative to the WGS-84 ellipsiod.

    Notes
    -----
    The LCF file format was shared by Casey Smith at Resonon on February 16. 2021.
    The LCF specification was inherited from Space Computer Corp.
    """
    try:
        lcf_raw = np.loadtxt(lcf_file_path)
    except Exception as e:
        logger.error(f"Error reading LCF file {lcf_file_path}: {e}")
        raise
    column_headers = [
        "time",
        "roll",
        "pitch",
        "yaw",
        "longitude",
        "latitude",
        "altitude",
    ]
    lcf_data = {header: lcf_raw[:, i] for i, header in enumerate(column_headers)}

    if convert_to_unix_time:
        lcf_data["time"] += mpu.gps2unix(lcf_data["time"][0]) - lcf_data["time"][0]

    if time_rel_to_file_start:
        lcf_data["time"] -= lcf_data["time"][0]

    return lcf_data

read_times_file(times_file_path, time_rel_to_file_start=True) staticmethod

Read image line timestamps (.times) file generated by Resonon camera

Parameters:

Name Type Description Default
times_file_path Union[Path, str]

Path to times file. Usually a "sidecar" file to an hyperspectral image with same "base" filename.

required
time_rel_to_file_start bool

Boolean indicating if times should be offset so that first timestamp is zero. If not, the original timestamp value is returned.

True

Returns:

Name Type Description
times NDArray

Numpy array containing timestamps for every line of the corresponding hyperspectral image. The timestamps are in units of seconds, and are relative to when the system started (values are usually within the 0-10000 second range). If time_rel_to_file_start=True, the times are offset so that the first timestamp is zero. The first timestamp of the times file and the first timestamp of the corresponding lcf file (GPS/IMU data) are assumed to the recorded at exactly the same time. If both sets of timestamps are offset so that time is measured relative to the start of the file, the times can be used to calculate interpolated GPS/IMU values for each image line.

Source code in massipipe/georeferencing.py
@staticmethod
def read_times_file(
    times_file_path: Union[Path, str], time_rel_to_file_start: bool = True
) -> NDArray:
    """Read image line timestamps (.times) file generated by Resonon camera

    Parameters
    ----------
    times_file_path: Union[Path,str]
        Path to times file. Usually a "sidecar" file to an hyperspectral image with
        same "base" filename.
    time_rel_to_file_start: bool, default True
        Boolean indicating if times should be offset so that first timestamp is
        zero. If not, the original timestamp value is returned.

    Returns
    -------
    times: NDArray
        Numpy array containing timestamps for every line of the corresponding
        hyperspectral image. The timestamps are in units of seconds, and are
        relative to when the system started (values are usually within the 0-10000
        second range). If time_rel_to_file_start=True, the times are offset so that
        the first timestamp is zero. The first timestamp of the times file and the
        first timestamp of the corresponding lcf file (GPS/IMU data) are assumed to
        the recorded at exactly the same time. If both sets of timestamps are offset
        so that time is measured relative to the start of the file, the times can be
        used to calculate interpolated GPS/IMU values for each image line.
    """
    try:
        image_times = np.loadtxt(times_file_path)
    except Exception as e:
        logger.error(f"Error reading times file {times_file_path}: {e}")
        raise
    if time_rel_to_file_start:
        image_times = image_times - image_times[0]
    return image_times

interpolate_lcf_to_times(lcf_data, image_times, convert_to_list=True) staticmethod

Interpolate LCF data to image line times

Parameters:

Name Type Description Default
lcf_data dict

Dictionary with data read from *.lcf file

required
image_times NDArray

Array of time stamps from *.times file

required
convert_to_list bool

Whether to convert interpolated LCF data to list (useful for saving data as JSON). If False, interpolated values in lcf_data_interp are formatted as NDArray.

True

Returns:

Name Type Description
lcf_data_interp dict[list | NDArray]

Version of lcf_data with all measured values interpolated to image timestamps.

Source code in massipipe/georeferencing.py
@staticmethod
def interpolate_lcf_to_times(
    lcf_data: Dict[str, NDArray], image_times: NDArray, convert_to_list: bool = True
) -> Dict[str, Union[List, NDArray]]:
    """Interpolate LCF data to image line times

    Parameters
    ----------
    lcf_data : dict
        Dictionary with data read from *.lcf file
    image_times : NDArray
        Array of time stamps from *.times file
    convert_to_list : bool, default True
        Whether to convert interpolated LCF data to list (useful for saving data as
        JSON). If False, interpolated values in lcf_data_interp are formatted as
        NDArray.

    Returns
    -------
    lcf_data_interp: dict[list | NDArray]
        Version of lcf_data with all measured values interpolated to image
        timestamps.
    """
    lcf_data_interp = {}
    lcf_times = lcf_data["time"]

    # LCF times and image times are both measured in seconds, but with different
    # offsets. "Sync" image times so they correspond to LCF time offset:
    image_times = image_times - image_times[0] + lcf_times[0]

    for lcf_key, lcf_value in lcf_data.items():
        lcf_data_interp[lcf_key] = np.interp(image_times, lcf_times, lcf_value)
        if convert_to_list:
            lcf_data_interp[lcf_key] = lcf_data_interp[lcf_key].tolist()
    return lcf_data_interp

read_and_save_imu_data(lcf_path, times_path, json_path)

Parse .lcf and .times files and save as JSON

Parameters:

Name Type Description Default
lcf_path Union[Path, str]

Path to *.lcf (IMU data) file

required
times_path Union[Path, str]

Path to *.times (image line timestamps) file

required
json_path Union[Path, str]

Path to output JSON file

required
Source code in massipipe/georeferencing.py
def read_and_save_imu_data(
    self,
    lcf_path: Union[Path, str],
    times_path: Union[Path, str],
    json_path: Union[Path, str],
) -> None:
    """Parse *.lcf and *.times files and save as JSON

    Parameters
    ----------
    lcf_path : Union[Path, str]
        Path to *.lcf (IMU data) file
    times_path : Union[Path, str]
        Path to *.times (image line timestamps) file
    json_path : Union[Path, str]
        Path to output JSON file
    """
    try:
        lcf_data = self.read_lcf_file(lcf_path)
        times_data = self.read_times_file(times_path)
        lcf_data_interp = self.interpolate_lcf_to_times(lcf_data, times_data)

        with open(json_path, "w", encoding="utf-8") as write_file:
            json.dump(lcf_data_interp, write_file, ensure_ascii=False, indent=4)
    except Exception as e:
        logger.error(f"Error processing IMU data: {e}")
        raise

ImuGeoTransformer

Class for calculating affine geotransform based on IMU data and image shape.

Attributes:

Name Type Description
camera_opening_angle float

Full opening angle of camera, in radians.

pitch_offset float

Forward pointing angle of the camera relative to nadir, in radians.

roll_offset float

Right pointing angle of the camera relative to nadir, in radians.

altitude_offset float

Offset added to the estimated altitude.

utm_x_offset float

Offset in the UTM x-coordinate.

utm_y_offset float

Offset in the UTM y-coordinate.

imu_data dict

IMU data loaded from JSON file.

image_shape tuple

Shape of the hyperspectral image.

utm_x float

UTM x-coordinate of the IMU data.

utm_y float

UTM y-coordinate of the IMU data.

camera_origin NDArray

Origin of the camera in UTM coordinates.

t_total float

Total time duration of IMU data.

dt float

Sampling interval of IMU data.

v_alongtrack NDArray

Along-track velocity vector, in m/s.

u_alongtrack NDArray

Unit vector (easting, northing) pointing along flight direction

gsd_alongtrack float

Ground sampling distance along track.

swath_length float

Length of the swath along track.

mean_altitude float

Mean altitude of the UAV during imaging.

u_acrosstrack NDArray

Unit vector (easting, northing) pointing left relative to flight direction. The direction is chosen to match that of the image coordinate system: Origin in upper left corner, down (increasing line number) corresponds to positive along-track direction, right (increasing sample number) corresponds to positive cross-track direction.

swath_width float

Width of the swath across track.

gsd_acrosstrack float

Ground sampling distance across track.

image_origin NDArray

Coordinates of the image origin.

geotransform tuple

6-element affine transform for the image.

rotation_deg float

Image rotation in degrees.

utm_zone_number int

UTM zone number.

utm_zone_hemi str

UTM hemisphere (North or South).

envi_map_info str

ENVI "map info" string, including image rotation.

Source code in massipipe/georeferencing.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
class ImuGeoTransformer:
    """
    Class for calculating affine geotransform based on IMU data and image shape.

    Attributes
    ----------
    camera_opening_angle : float
        Full opening angle of camera, in radians.
    pitch_offset : float
        Forward pointing angle of the camera relative to nadir, in radians.
    roll_offset : float
        Right pointing angle of the camera relative to nadir, in radians.
    altitude_offset : float
        Offset added to the estimated altitude.
    utm_x_offset : float
        Offset in the UTM x-coordinate.
    utm_y_offset : float
        Offset in the UTM y-coordinate.
    imu_data : dict
        IMU data loaded from JSON file.
    image_shape : tuple
        Shape of the hyperspectral image.
    utm_x : float
        UTM x-coordinate of the IMU data.
    utm_y : float
        UTM y-coordinate of the IMU data.
    camera_origin : NDArray
        Origin of the camera in UTM coordinates.
    t_total : float
        Total time duration of IMU data.
    dt : float
        Sampling interval of IMU data.
    v_alongtrack : NDArray
        Along-track velocity vector, in m/s.
    u_alongtrack: NDArray
        Unit vector (easting, northing) pointing along flight direction
    gsd_alongtrack : float
        Ground sampling distance along track.
    swath_length : float
        Length of the swath along track.
    mean_altitude : float
        Mean altitude of the UAV during imaging.
    u_acrosstrack: NDArray
        Unit vector (easting, northing) pointing left relative to flight direction. The
        direction is chosen to match that of the image coordinate system: Origin in
        upper left corner, down (increasing line number) corresponds to positive
        along-track direction, right (increasing sample number) corresponds to positive
        cross-track direction.
    swath_width : float
        Width of the swath across track.
    gsd_acrosstrack : float
        Ground sampling distance across track.
    image_origin : NDArray
        Coordinates of the image origin.
    geotransform : tuple
        6-element affine transform for the image.
    rotation_deg : float
        Image rotation in degrees.
    utm_zone_number : int
        UTM zone number.
    utm_zone_hemi : str
        UTM hemisphere (North or South).
    envi_map_info : str
        ENVI "map info" string, including image rotation.

    """

    def __init__(
        self,
        imu_data_path: Union[Path, str],
        image_header_path: Union[Path, str],
        camera_opening_angle: Optional[float] = None,
        pitch_offset: Optional[float] = None,
        roll_offset: Optional[float] = None,
        altitude_offset: Optional[float] = None,
        utm_x_offset: Optional[float] = None,
        utm_y_offset: Optional[float] = None,
        assume_square_pixels: Optional[bool] = True,
    ):
        """Initialize image flight metadata object

        Parameters
        ----------
        imu_data_path : Union[Path, str]
            Path to JSON file with IMU data
        image_header_path : Union[Path, str]
            Path to header for hyperspectral image
        camera_opening_angle : float, default 36.5
            Full opening angle of camera, in degrees. Corresponds to angle between rays
            hitting leftmost and rightmost pixels of image.
        pitch_offset : float, default 0.0
            How much forward the camera is pointing relative to nadir (degrees)
        roll_offset : float, default 0.0
            How much to the right ("right wing up") the camera is pointing relative to
            nadir (degrees).
        altitude_offset : float, default 0.0
            Offset added to the estimated altitude. If the UAV was higher in reality
            than that estimated by the GeoTransformer object, add a positive
            altitude_offset.
        utm_x_offset, utm_y_offset: float, default 0.0
            How far from the real position the IMU data position is, measured in meters.
            If the IMU data is too far east, utm_x_offset is positive. If the IMU data
            is too far north, utm_y_offset is positive. The offset is assumed to be
            constant across a dataset.
        assume_square_pixels : bool, default True
            Whether to assume that the original image was acquired with flight
            parameters (flight speed, frame rate, altitude) that would produce square
            pixels. If true, the altitude of the camera is estimated from the shape of
            the image and the (along-track) swath length. This can be useful in cases
            where absolute altitude measurement of the camera IMU is not very accurate.
        """

        # Set default values
        camera_opening_angle = camera_opening_angle if camera_opening_angle else 36.5
        pitch_offset = pitch_offset if pitch_offset else 0.0
        roll_offset = roll_offset if roll_offset else 0.0
        altitude_offset = altitude_offset if altitude_offset else 0.0
        utm_x_offset = utm_x_offset if utm_x_offset else 0.0
        utm_y_offset = utm_y_offset if utm_y_offset else 0.0
        assume_square_pixels = (
            True if assume_square_pixels or (assume_square_pixels is None) else False
        )

        # Set attributes, converting degrees to radians
        self.camera_opening_angle = camera_opening_angle * (np.pi / 180)
        self.pitch_offset = pitch_offset * (np.pi / 180)
        self.roll_offset = roll_offset * (np.pi / 180)
        self.altitude_offset = altitude_offset
        self.utm_x_offset = utm_x_offset
        self.utm_y_offset = utm_y_offset

        try:
            # Get imu data and image shape from files
            self.imu_data = mpu.read_json(imu_data_path)
            self.image_shape = mpu.get_image_shape(image_header_path)

            # Get UTM coordinates and CRS code
            self.utm_x, self.utm_y, self.utm_epsg = mpu.convert_long_lat_to_utm(
                self.imu_data["longitude"], self.imu_data["latitude"]
            )
            self.utm_x -= self.utm_x_offset
            self.utm_y -= self.utm_y_offset
            self.camera_origin = np.array([self.utm_x[0], self.utm_y[0]])

            # Time-related attributes
            t_total, dt = self._calc_time_attributes()
            self.t_total = t_total
            self.dt = dt

            # Along-track properties
            v_at, u_at, gsd_at, sl = self._calc_alongtrack_properties()
            self.v_alongtrack = v_at
            self.u_alongtrack = u_at
            self.gsd_alongtrack = gsd_at
            self.swath_length = sl

            # Altitude
            self.mean_altitude = self._calc_mean_altitude(assume_square_pixels)

            # Cross-track properties
            u_ct, sw, gsd_ct = self._calc_acrosstrack_properties()
            self.u_acrosstrack = u_ct
            self.swath_width = sw
            self.gsd_acrosstrack = gsd_ct

            # Image origin (image transform offset)
            self.image_origin = self._calc_image_origin()

            # Affine transform
            self.geotransform = self.get_image_transform(ordering="alphabetical")

            # Image rotation (degrees)
            self.rotation_deg = self._calc_image_rotation()

            # UTM zone
            utm_zone_number, utm_zone_hemi = self._get_utm_zone()
            self.utm_zone_number = utm_zone_number
            self.utm_zone_hemi = utm_zone_hemi

            # ENVI map info
            self.envi_map_info = self.get_envi_map_info()
        except Exception as e:
            logger.error(f"Error initializing GeoTransformer: {e}")
            raise

    def _calc_time_attributes(self) -> Tuple[np.floating[Any], np.floating[Any]]:
        """Calculate time duration and sampling interval of IMU data

        Returns
        -------
        t_total: float
            Total time duration of IMU data
        dt: float
            Sampling interval of IMU data
        """
        t = np.array(self.imu_data["time"])
        dt = np.mean(np.diff(t))
        t_total = len(t) * dt
        return t_total, dt

    def _calc_alongtrack_properties(
        self,
    ) -> Tuple[NDArray, NDArray, np.floating[Any], np.floating[Any]]:
        """Calculate along-track velocity, gsd, and swath length

        Returns
        -------
        v_alongtrack: NDArray
            Along-track velocity vector
        u_alongtrack: NDArray
            Along-track unit vector
        gsd_alongtrack: float
            Ground sampling distance along track
        swath_length: float
            Length of the swath along track
        """
        vx_alongtrack = (self.utm_x[-1] - self.utm_x[0]) / self.t_total
        vy_alongtrack = (self.utm_y[-1] - self.utm_y[0]) / self.t_total
        v_alongtrack = np.array((vx_alongtrack, vy_alongtrack))
        v_alongtrack_abs = np.linalg.norm(v_alongtrack)
        u_alongtrack = v_alongtrack / v_alongtrack_abs

        swath_length = self.t_total * v_alongtrack_abs
        gsd_alongtrack = self.dt * v_alongtrack_abs

        return v_alongtrack, u_alongtrack, gsd_alongtrack, swath_length

    def _calc_mean_altitude(self, assume_square_pixels: bool) -> float:
        """Calculate mean altitude of UAV during imaging

        Parameters
        ----------
        assume_square_pixels: bool
            If true, the across-track sampling distance is assumed to be equal to the
            alongtrack sampling distance. The altitude is calculated based on this and
            the number of cross-track samples. If false, the mean of the altitude values
            from the imu data is used. In both cases, the altitude offset is added.

        Returns
        -------
        altitude: float
            Mean altitude of the UAV
        """
        if assume_square_pixels:
            swath_width = self.gsd_alongtrack * self.image_shape[1]
            altitude = swath_width / (2 * np.tan(self.camera_opening_angle / 2))
        else:
            altitude = np.mean(self.imu_data["altitude"])
        return altitude + self.altitude_offset

    def _calc_acrosstrack_properties(self) -> Tuple[NDArray, float, float]:
        """Calculate cross-track unit vector, swath width and sampling distance

        Returns
        -------
        u_acrosstrack: NDArray
            Cross-track unit vector
        swath_width: float
            Width of the swath across track
        gsd_acrosstrack: float
            Ground sampling distance across track
        """
        u_acrosstrack = np.array([-self.u_alongtrack[1], self.u_alongtrack[0]])  # Rotate 90 CCW
        swath_width = 2 * self.mean_altitude * np.tan(self.camera_opening_angle / 2)
        gsd_acrosstrack = swath_width / self.image_shape[1]
        return u_acrosstrack, swath_width, gsd_acrosstrack

    def _calc_image_origin(self) -> NDArray:
        """Calculate location of image pixel (0,0) in georeferenced coordinates (x,y)

        Returns
        -------
        image_origin: NDArray
            Coordinates of the image origin
        """
        alongtrack_offset = self.mean_altitude * np.tan(self.pitch_offset) * self.u_alongtrack
        acrosstrack_offset = self.mean_altitude * np.tan(self.roll_offset) * self.u_acrosstrack
        # NOTE: Signs of cross-track elements in equation below are "flipped" because
        # UTM coordinate system is right-handed and image coordinate system is
        # left-handed. If the camera_origin is in the middle of the top line of the
        # image, u_acrosstrack points away from the image origin (line 0, sample 0).
        image_origin = (
            self.camera_origin
            - 0.5 * self.swath_width * self.u_acrosstrack  # Edge of swath
            + acrosstrack_offset
            - alongtrack_offset
        )
        return image_origin

    def _calc_image_rotation(self) -> float:
        """Calculate image rotation in degrees (zero for image origin in NW corner)

        Returns
        -------
        rotation_deg: float
            Image rotation in degrees
        """
        rotation_rad = math.atan2(self.u_alongtrack[0], -self.u_alongtrack[1])
        rotation_deg = rotation_rad * (180.0 / math.pi)
        return rotation_deg

    def _get_utm_zone(self) -> Tuple[int, str]:
        """Get UTM zone and hemisphere (North or South)

        Returns
        -------
        utm_zone_number: int
            UTM zone number
        utm_zone_hemi: str
            UTM hemisphere (North or South)
        """
        if self.utm_epsg is not None:
            utm_zone = pyproj.CRS.from_epsg(self.utm_epsg).utm_zone
            if utm_zone is None:
                raise ValueError("UTM zone undefined.")
        else:
            raise ValueError("EPSG value not set - UTM zone undefined.")
        utm_zone_number = int(utm_zone[:-1])
        if utm_zone[-1] == "N":
            utm_zone_hemi = "North"
        else:
            utm_zone_hemi = "South"
        return utm_zone_number, utm_zone_hemi

    def get_image_transform(self, ordering: str = "alphabetical") -> Tuple[float, ...]:
        """Get 6-element affine transform for image

        Parameters
        ----------
        ordering : str, {"alphabetical","worldfile"}, default "alphabetical"
            If 'alphabetical', return A,B,C,D,E,F If 'worldfile', return A,D,B,E,C,F See
            https://en.wikipedia.org/wiki/World_file

        Returns
        -------
        transform: tuple[float]
            6-element affine transform

        Raises
        ------
        ValueError
            If invalid ordering parameter is used
        """
        A, D = self.gsd_acrosstrack * self.u_acrosstrack
        B, E = self.gsd_alongtrack * self.u_alongtrack
        C, F = self.image_origin

        if ordering == "alphabetical":
            return A, B, C, D, E, F
        elif ordering == "worldfile":
            return A, D, B, E, C, F
        else:
            error_msg = f"Invalid ordering argument {ordering}"
            logger.error(error_msg)
            raise ValueError(error_msg)

    def get_envi_map_info(self) -> str:
        """Create ENVI "map info" string, including image rotation

        ENVI header files support a "map info" field which describes the geotransform
        for the image. The field has up to 11 parameters:

        1.  Projection name
        2.  Reference (tie point) pixel x location (in file coordinates)
        3.  Reference (tie point) pixel y location (in file coordinates)
        4.  Pixel easting
        5.  Pixel northing
        6.  x pixel size (on ground)
        7.  y pixel size (on ground)
        8.  Projection zone (UTM only)
        9.  North or South (UTM only)
        10. Datum
        11. Units

        NV5 / ENVI does not natively support geotransforms that are rotated. However,
        GDAL supports the use of specifying a rotation argument ("rotation=<degrees>")
        as the 11th argument (Units). This method creates a map_info string with this
        rotation syntax.

        Returns: map_info: str
            String with map info formatted in ENVI header style. Example: "{UTM, 1, 1,
            581226.666764, 7916192.56364, 5.2, 5.2,
                     4, North, WGS-84, rotation=42}"

        References
        ----------
        https://www.nv5geospatialsoftware.com/docs/ENVIHeaderFiles.html
        https://github.com/ornldaac/AVIRIS-NG_ENVI-rotatedgrid
        https://gis.stackexchange.com/questions/229952/rotate-envi-hyperspectral-imagery-with-gdal
        https://trac.osgeo.org/gdal/ticket/1778#comment:8
        https://github.com/OSGeo/gdal/blob/master/frmts/raw/envidataset.cpp#L1393
        """
        # fmt: off
        map_info = [
            "UTM",                              # Projection name
            "1",                                # Ref. x pixel sample number, 1-based
            "1",                                # Ref. y pixel sample number, 1-based
            f"{self.image_origin[0]}",          # Ref. pixel easting
            f"{self.image_origin[1]}",          # Ref. pixel northing
            f"{self.gsd_acrosstrack}",          # X pixel size (across-track GSD)
            f"{self.gsd_alongtrack}",           # Y pixel size (along-track GSD)
            f"{self.utm_zone_number}",          # UTM zone number
            f"{self.utm_zone_hemi}",            # UTM hemisphere (North or South)
            "WGS-84",                           # Datum
            f"rotation={self.rotation_deg}",    # "Units" (accepts rotation arg.)
        ]
        map_info = "{" + ", ".join(map_info) + "}"
        # fmt: on
        return map_info

    def save_image_geotransform(self, geotransform_json_path: Union[Path, str]) -> None:
        """Save geotransform and related parameters as JSON file

        The following parameters are saved to file: utm_epsg: int
            Integer EPSG code describing UTM zone (CRS)
        geotransform: tuple
            6-element tuple (a,b,c,d,e,f) with affine transform
        map_info: str
            "map info" string in ENVI header format
        gsd_alongtrack: float
            Ground sampling distance (resolution) along flight track, in meters
        gsd_acrosstrack: float
            Ground sampling distance (resolution) across flight track, in meters
        swath_length: float
            Length of image swath (along track), in meters
        swath_width: float
            Width of image swath (across track), in meters
        rotation_deg: float
            Rotation of image in degrees, clockwise, +/- 180 degrees

        Parameters
        ----------
        geotransform_json_path : Union[Path, str]
            Path to JSON file
        """
        try:
            geotransform_data = {
                "utm_epsg": self.utm_epsg,
                "geotransform": self.geotransform,
                "envi_map_info": self.envi_map_info,
                "image_origin": {"x": self.image_origin[0], "y": self.image_origin[1]},
                "gsd_alongtrack": self.gsd_alongtrack,
                "gsd_acrosstrack": self.gsd_acrosstrack,
                "swath_length": self.swath_length,
                "swath_width": self.swath_width,
                "rotation_deg": self.rotation_deg,
            }

            with open(geotransform_json_path, "w", encoding="utf-8") as write_file:
                json.dump(geotransform_data, write_file, ensure_ascii=False, indent=4)
        except Exception as e:
            logger.error(f"Error saving geotransform data: {e}")
            raise

__init__(imu_data_path, image_header_path, camera_opening_angle=None, pitch_offset=None, roll_offset=None, altitude_offset=None, utm_x_offset=None, utm_y_offset=None, assume_square_pixels=True)

Parameters:

Name Type Description Default
imu_data_path Union[Path, str]

Path to JSON file with IMU data

required
image_header_path Union[Path, str]

Path to header for hyperspectral image

required
camera_opening_angle float

Full opening angle of camera, in degrees. Corresponds to angle between rays hitting leftmost and rightmost pixels of image.

36.5
pitch_offset float

How much forward the camera is pointing relative to nadir (degrees)

0.0
roll_offset float

How much to the right ("right wing up") the camera is pointing relative to nadir (degrees).

0.0
altitude_offset float

Offset added to the estimated altitude. If the UAV was higher in reality than that estimated by the GeoTransformer object, add a positive altitude_offset.

0.0
utm_x_offset Optional[float]

How far from the real position the IMU data position is, measured in meters. If the IMU data is too far east, utm_x_offset is positive. If the IMU data is too far north, utm_y_offset is positive. The offset is assumed to be constant across a dataset.

None
utm_y_offset Optional[float]

How far from the real position the IMU data position is, measured in meters. If the IMU data is too far east, utm_x_offset is positive. If the IMU data is too far north, utm_y_offset is positive. The offset is assumed to be constant across a dataset.

None
assume_square_pixels bool

Whether to assume that the original image was acquired with flight parameters (flight speed, frame rate, altitude) that would produce square pixels. If true, the altitude of the camera is estimated from the shape of the image and the (along-track) swath length. This can be useful in cases where absolute altitude measurement of the camera IMU is not very accurate.

True
Source code in massipipe/georeferencing.py
def __init__(
    self,
    imu_data_path: Union[Path, str],
    image_header_path: Union[Path, str],
    camera_opening_angle: Optional[float] = None,
    pitch_offset: Optional[float] = None,
    roll_offset: Optional[float] = None,
    altitude_offset: Optional[float] = None,
    utm_x_offset: Optional[float] = None,
    utm_y_offset: Optional[float] = None,
    assume_square_pixels: Optional[bool] = True,
):
    """Initialize image flight metadata object

    Parameters
    ----------
    imu_data_path : Union[Path, str]
        Path to JSON file with IMU data
    image_header_path : Union[Path, str]
        Path to header for hyperspectral image
    camera_opening_angle : float, default 36.5
        Full opening angle of camera, in degrees. Corresponds to angle between rays
        hitting leftmost and rightmost pixels of image.
    pitch_offset : float, default 0.0
        How much forward the camera is pointing relative to nadir (degrees)
    roll_offset : float, default 0.0
        How much to the right ("right wing up") the camera is pointing relative to
        nadir (degrees).
    altitude_offset : float, default 0.0
        Offset added to the estimated altitude. If the UAV was higher in reality
        than that estimated by the GeoTransformer object, add a positive
        altitude_offset.
    utm_x_offset, utm_y_offset: float, default 0.0
        How far from the real position the IMU data position is, measured in meters.
        If the IMU data is too far east, utm_x_offset is positive. If the IMU data
        is too far north, utm_y_offset is positive. The offset is assumed to be
        constant across a dataset.
    assume_square_pixels : bool, default True
        Whether to assume that the original image was acquired with flight
        parameters (flight speed, frame rate, altitude) that would produce square
        pixels. If true, the altitude of the camera is estimated from the shape of
        the image and the (along-track) swath length. This can be useful in cases
        where absolute altitude measurement of the camera IMU is not very accurate.
    """

    # Set default values
    camera_opening_angle = camera_opening_angle if camera_opening_angle else 36.5
    pitch_offset = pitch_offset if pitch_offset else 0.0
    roll_offset = roll_offset if roll_offset else 0.0
    altitude_offset = altitude_offset if altitude_offset else 0.0
    utm_x_offset = utm_x_offset if utm_x_offset else 0.0
    utm_y_offset = utm_y_offset if utm_y_offset else 0.0
    assume_square_pixels = (
        True if assume_square_pixels or (assume_square_pixels is None) else False
    )

    # Set attributes, converting degrees to radians
    self.camera_opening_angle = camera_opening_angle * (np.pi / 180)
    self.pitch_offset = pitch_offset * (np.pi / 180)
    self.roll_offset = roll_offset * (np.pi / 180)
    self.altitude_offset = altitude_offset
    self.utm_x_offset = utm_x_offset
    self.utm_y_offset = utm_y_offset

    try:
        # Get imu data and image shape from files
        self.imu_data = mpu.read_json(imu_data_path)
        self.image_shape = mpu.get_image_shape(image_header_path)

        # Get UTM coordinates and CRS code
        self.utm_x, self.utm_y, self.utm_epsg = mpu.convert_long_lat_to_utm(
            self.imu_data["longitude"], self.imu_data["latitude"]
        )
        self.utm_x -= self.utm_x_offset
        self.utm_y -= self.utm_y_offset
        self.camera_origin = np.array([self.utm_x[0], self.utm_y[0]])

        # Time-related attributes
        t_total, dt = self._calc_time_attributes()
        self.t_total = t_total
        self.dt = dt

        # Along-track properties
        v_at, u_at, gsd_at, sl = self._calc_alongtrack_properties()
        self.v_alongtrack = v_at
        self.u_alongtrack = u_at
        self.gsd_alongtrack = gsd_at
        self.swath_length = sl

        # Altitude
        self.mean_altitude = self._calc_mean_altitude(assume_square_pixels)

        # Cross-track properties
        u_ct, sw, gsd_ct = self._calc_acrosstrack_properties()
        self.u_acrosstrack = u_ct
        self.swath_width = sw
        self.gsd_acrosstrack = gsd_ct

        # Image origin (image transform offset)
        self.image_origin = self._calc_image_origin()

        # Affine transform
        self.geotransform = self.get_image_transform(ordering="alphabetical")

        # Image rotation (degrees)
        self.rotation_deg = self._calc_image_rotation()

        # UTM zone
        utm_zone_number, utm_zone_hemi = self._get_utm_zone()
        self.utm_zone_number = utm_zone_number
        self.utm_zone_hemi = utm_zone_hemi

        # ENVI map info
        self.envi_map_info = self.get_envi_map_info()
    except Exception as e:
        logger.error(f"Error initializing GeoTransformer: {e}")
        raise

_calc_time_attributes()

Calculate time duration and sampling interval of IMU data

Returns:

Name Type Description
t_total float

Total time duration of IMU data

dt float

Sampling interval of IMU data

Source code in massipipe/georeferencing.py
def _calc_time_attributes(self) -> Tuple[np.floating[Any], np.floating[Any]]:
    """Calculate time duration and sampling interval of IMU data

    Returns
    -------
    t_total: float
        Total time duration of IMU data
    dt: float
        Sampling interval of IMU data
    """
    t = np.array(self.imu_data["time"])
    dt = np.mean(np.diff(t))
    t_total = len(t) * dt
    return t_total, dt

_calc_alongtrack_properties()

Calculate along-track velocity, gsd, and swath length

Returns:

Name Type Description
v_alongtrack NDArray

Along-track velocity vector

u_alongtrack NDArray

Along-track unit vector

gsd_alongtrack float

Ground sampling distance along track

swath_length float

Length of the swath along track

Source code in massipipe/georeferencing.py
def _calc_alongtrack_properties(
    self,
) -> Tuple[NDArray, NDArray, np.floating[Any], np.floating[Any]]:
    """Calculate along-track velocity, gsd, and swath length

    Returns
    -------
    v_alongtrack: NDArray
        Along-track velocity vector
    u_alongtrack: NDArray
        Along-track unit vector
    gsd_alongtrack: float
        Ground sampling distance along track
    swath_length: float
        Length of the swath along track
    """
    vx_alongtrack = (self.utm_x[-1] - self.utm_x[0]) / self.t_total
    vy_alongtrack = (self.utm_y[-1] - self.utm_y[0]) / self.t_total
    v_alongtrack = np.array((vx_alongtrack, vy_alongtrack))
    v_alongtrack_abs = np.linalg.norm(v_alongtrack)
    u_alongtrack = v_alongtrack / v_alongtrack_abs

    swath_length = self.t_total * v_alongtrack_abs
    gsd_alongtrack = self.dt * v_alongtrack_abs

    return v_alongtrack, u_alongtrack, gsd_alongtrack, swath_length

_calc_mean_altitude(assume_square_pixels)

Calculate mean altitude of UAV during imaging

Parameters:

Name Type Description Default
assume_square_pixels bool

If true, the across-track sampling distance is assumed to be equal to the alongtrack sampling distance. The altitude is calculated based on this and the number of cross-track samples. If false, the mean of the altitude values from the imu data is used. In both cases, the altitude offset is added.

required

Returns:

Name Type Description
altitude float

Mean altitude of the UAV

Source code in massipipe/georeferencing.py
def _calc_mean_altitude(self, assume_square_pixels: bool) -> float:
    """Calculate mean altitude of UAV during imaging

    Parameters
    ----------
    assume_square_pixels: bool
        If true, the across-track sampling distance is assumed to be equal to the
        alongtrack sampling distance. The altitude is calculated based on this and
        the number of cross-track samples. If false, the mean of the altitude values
        from the imu data is used. In both cases, the altitude offset is added.

    Returns
    -------
    altitude: float
        Mean altitude of the UAV
    """
    if assume_square_pixels:
        swath_width = self.gsd_alongtrack * self.image_shape[1]
        altitude = swath_width / (2 * np.tan(self.camera_opening_angle / 2))
    else:
        altitude = np.mean(self.imu_data["altitude"])
    return altitude + self.altitude_offset

_calc_acrosstrack_properties()

Calculate cross-track unit vector, swath width and sampling distance

Returns:

Name Type Description
u_acrosstrack NDArray

Cross-track unit vector

swath_width float

Width of the swath across track

gsd_acrosstrack float

Ground sampling distance across track

Source code in massipipe/georeferencing.py
def _calc_acrosstrack_properties(self) -> Tuple[NDArray, float, float]:
    """Calculate cross-track unit vector, swath width and sampling distance

    Returns
    -------
    u_acrosstrack: NDArray
        Cross-track unit vector
    swath_width: float
        Width of the swath across track
    gsd_acrosstrack: float
        Ground sampling distance across track
    """
    u_acrosstrack = np.array([-self.u_alongtrack[1], self.u_alongtrack[0]])  # Rotate 90 CCW
    swath_width = 2 * self.mean_altitude * np.tan(self.camera_opening_angle / 2)
    gsd_acrosstrack = swath_width / self.image_shape[1]
    return u_acrosstrack, swath_width, gsd_acrosstrack

_calc_image_origin()

Calculate location of image pixel (0,0) in georeferenced coordinates (x,y)

Returns:

Name Type Description
image_origin NDArray

Coordinates of the image origin

Source code in massipipe/georeferencing.py
def _calc_image_origin(self) -> NDArray:
    """Calculate location of image pixel (0,0) in georeferenced coordinates (x,y)

    Returns
    -------
    image_origin: NDArray
        Coordinates of the image origin
    """
    alongtrack_offset = self.mean_altitude * np.tan(self.pitch_offset) * self.u_alongtrack
    acrosstrack_offset = self.mean_altitude * np.tan(self.roll_offset) * self.u_acrosstrack
    # NOTE: Signs of cross-track elements in equation below are "flipped" because
    # UTM coordinate system is right-handed and image coordinate system is
    # left-handed. If the camera_origin is in the middle of the top line of the
    # image, u_acrosstrack points away from the image origin (line 0, sample 0).
    image_origin = (
        self.camera_origin
        - 0.5 * self.swath_width * self.u_acrosstrack  # Edge of swath
        + acrosstrack_offset
        - alongtrack_offset
    )
    return image_origin

_calc_image_rotation()

Calculate image rotation in degrees (zero for image origin in NW corner)

Returns:

Name Type Description
rotation_deg float

Image rotation in degrees

Source code in massipipe/georeferencing.py
def _calc_image_rotation(self) -> float:
    """Calculate image rotation in degrees (zero for image origin in NW corner)

    Returns
    -------
    rotation_deg: float
        Image rotation in degrees
    """
    rotation_rad = math.atan2(self.u_alongtrack[0], -self.u_alongtrack[1])
    rotation_deg = rotation_rad * (180.0 / math.pi)
    return rotation_deg

_get_utm_zone()

Get UTM zone and hemisphere (North or South)

Returns:

Name Type Description
utm_zone_number int

UTM zone number

utm_zone_hemi str

UTM hemisphere (North or South)

Source code in massipipe/georeferencing.py
def _get_utm_zone(self) -> Tuple[int, str]:
    """Get UTM zone and hemisphere (North or South)

    Returns
    -------
    utm_zone_number: int
        UTM zone number
    utm_zone_hemi: str
        UTM hemisphere (North or South)
    """
    if self.utm_epsg is not None:
        utm_zone = pyproj.CRS.from_epsg(self.utm_epsg).utm_zone
        if utm_zone is None:
            raise ValueError("UTM zone undefined.")
    else:
        raise ValueError("EPSG value not set - UTM zone undefined.")
    utm_zone_number = int(utm_zone[:-1])
    if utm_zone[-1] == "N":
        utm_zone_hemi = "North"
    else:
        utm_zone_hemi = "South"
    return utm_zone_number, utm_zone_hemi

get_image_transform(ordering='alphabetical')

Get 6-element affine transform for image

Parameters:

Name Type Description Default
ordering (str, {alphabetical, worldfile})

If 'alphabetical', return A,B,C,D,E,F If 'worldfile', return A,D,B,E,C,F See https://en.wikipedia.org/wiki/World_file

"alphabetical"

Returns:

Name Type Description
transform tuple[float]

6-element affine transform

Raises:

Type Description
ValueError

If invalid ordering parameter is used

Source code in massipipe/georeferencing.py
def get_image_transform(self, ordering: str = "alphabetical") -> Tuple[float, ...]:
    """Get 6-element affine transform for image

    Parameters
    ----------
    ordering : str, {"alphabetical","worldfile"}, default "alphabetical"
        If 'alphabetical', return A,B,C,D,E,F If 'worldfile', return A,D,B,E,C,F See
        https://en.wikipedia.org/wiki/World_file

    Returns
    -------
    transform: tuple[float]
        6-element affine transform

    Raises
    ------
    ValueError
        If invalid ordering parameter is used
    """
    A, D = self.gsd_acrosstrack * self.u_acrosstrack
    B, E = self.gsd_alongtrack * self.u_alongtrack
    C, F = self.image_origin

    if ordering == "alphabetical":
        return A, B, C, D, E, F
    elif ordering == "worldfile":
        return A, D, B, E, C, F
    else:
        error_msg = f"Invalid ordering argument {ordering}"
        logger.error(error_msg)
        raise ValueError(error_msg)

get_envi_map_info()

Create ENVI "map info" string, including image rotation

ENVI header files support a "map info" field which describes the geotransform for the image. The field has up to 11 parameters:

  1. Projection name
  2. Reference (tie point) pixel x location (in file coordinates)
  3. Reference (tie point) pixel y location (in file coordinates)
  4. Pixel easting
  5. Pixel northing
  6. x pixel size (on ground)
  7. y pixel size (on ground)
  8. Projection zone (UTM only)
  9. North or South (UTM only)
  10. Datum
  11. Units

NV5 / ENVI does not natively support geotransforms that are rotated. However, GDAL supports the use of specifying a rotation argument ("rotation=") as the 11th argument (Units). This method creates a map_info string with this rotation syntax.

Returns: map_info: str String with map info formatted in ENVI header style. Example: "{UTM, 1, 1, 581226.666764, 7916192.56364, 5.2, 5.2, 4, North, WGS-84, rotation=42}"

References

https://www.nv5geospatialsoftware.com/docs/ENVIHeaderFiles.html https://github.com/ornldaac/AVIRIS-NG_ENVI-rotatedgrid https://gis.stackexchange.com/questions/229952/rotate-envi-hyperspectral-imagery-with-gdal https://trac.osgeo.org/gdal/ticket/1778#comment:8 https://github.com/OSGeo/gdal/blob/master/frmts/raw/envidataset.cpp#L1393

Source code in massipipe/georeferencing.py
def get_envi_map_info(self) -> str:
    """Create ENVI "map info" string, including image rotation

    ENVI header files support a "map info" field which describes the geotransform
    for the image. The field has up to 11 parameters:

    1.  Projection name
    2.  Reference (tie point) pixel x location (in file coordinates)
    3.  Reference (tie point) pixel y location (in file coordinates)
    4.  Pixel easting
    5.  Pixel northing
    6.  x pixel size (on ground)
    7.  y pixel size (on ground)
    8.  Projection zone (UTM only)
    9.  North or South (UTM only)
    10. Datum
    11. Units

    NV5 / ENVI does not natively support geotransforms that are rotated. However,
    GDAL supports the use of specifying a rotation argument ("rotation=<degrees>")
    as the 11th argument (Units). This method creates a map_info string with this
    rotation syntax.

    Returns: map_info: str
        String with map info formatted in ENVI header style. Example: "{UTM, 1, 1,
        581226.666764, 7916192.56364, 5.2, 5.2,
                 4, North, WGS-84, rotation=42}"

    References
    ----------
    https://www.nv5geospatialsoftware.com/docs/ENVIHeaderFiles.html
    https://github.com/ornldaac/AVIRIS-NG_ENVI-rotatedgrid
    https://gis.stackexchange.com/questions/229952/rotate-envi-hyperspectral-imagery-with-gdal
    https://trac.osgeo.org/gdal/ticket/1778#comment:8
    https://github.com/OSGeo/gdal/blob/master/frmts/raw/envidataset.cpp#L1393
    """
    # fmt: off
    map_info = [
        "UTM",                              # Projection name
        "1",                                # Ref. x pixel sample number, 1-based
        "1",                                # Ref. y pixel sample number, 1-based
        f"{self.image_origin[0]}",          # Ref. pixel easting
        f"{self.image_origin[1]}",          # Ref. pixel northing
        f"{self.gsd_acrosstrack}",          # X pixel size (across-track GSD)
        f"{self.gsd_alongtrack}",           # Y pixel size (along-track GSD)
        f"{self.utm_zone_number}",          # UTM zone number
        f"{self.utm_zone_hemi}",            # UTM hemisphere (North or South)
        "WGS-84",                           # Datum
        f"rotation={self.rotation_deg}",    # "Units" (accepts rotation arg.)
    ]
    map_info = "{" + ", ".join(map_info) + "}"
    # fmt: on
    return map_info

save_image_geotransform(geotransform_json_path)

Save geotransform and related parameters as JSON file

The following parameters are saved to file: utm_epsg: int Integer EPSG code describing UTM zone (CRS) geotransform: tuple 6-element tuple (a,b,c,d,e,f) with affine transform map_info: str "map info" string in ENVI header format gsd_alongtrack: float Ground sampling distance (resolution) along flight track, in meters gsd_acrosstrack: float Ground sampling distance (resolution) across flight track, in meters swath_length: float Length of image swath (along track), in meters swath_width: float Width of image swath (across track), in meters rotation_deg: float Rotation of image in degrees, clockwise, +/- 180 degrees

Parameters:

Name Type Description Default
geotransform_json_path Union[Path, str]

Path to JSON file

required
Source code in massipipe/georeferencing.py
def save_image_geotransform(self, geotransform_json_path: Union[Path, str]) -> None:
    """Save geotransform and related parameters as JSON file

    The following parameters are saved to file: utm_epsg: int
        Integer EPSG code describing UTM zone (CRS)
    geotransform: tuple
        6-element tuple (a,b,c,d,e,f) with affine transform
    map_info: str
        "map info" string in ENVI header format
    gsd_alongtrack: float
        Ground sampling distance (resolution) along flight track, in meters
    gsd_acrosstrack: float
        Ground sampling distance (resolution) across flight track, in meters
    swath_length: float
        Length of image swath (along track), in meters
    swath_width: float
        Width of image swath (across track), in meters
    rotation_deg: float
        Rotation of image in degrees, clockwise, +/- 180 degrees

    Parameters
    ----------
    geotransform_json_path : Union[Path, str]
        Path to JSON file
    """
    try:
        geotransform_data = {
            "utm_epsg": self.utm_epsg,
            "geotransform": self.geotransform,
            "envi_map_info": self.envi_map_info,
            "image_origin": {"x": self.image_origin[0], "y": self.image_origin[1]},
            "gsd_alongtrack": self.gsd_alongtrack,
            "gsd_acrosstrack": self.gsd_acrosstrack,
            "swath_length": self.swath_length,
            "swath_width": self.swath_width,
            "rotation_deg": self.rotation_deg,
        }

        with open(geotransform_json_path, "w", encoding="utf-8") as write_file:
            json.dump(geotransform_data, write_file, ensure_ascii=False, indent=4)
    except Exception as e:
        logger.error(f"Error saving geotransform data: {e}")
        raise

FlatSpecGlintCorrector

Perform glint correction on reflectance images using a flat glint spectrum assumption.

This class removes glint by subtracting the mean near-infrared (NIR) value from each spectrum, based on the assumption that the glint is spectrally flat. Optionally, it applies smoothing with a Savitsky-Golay filter to the corrected image.

Source code in massipipe/glint.py
class FlatSpecGlintCorrector:
    """
    Perform glint correction on reflectance images using a flat glint spectrum assumption.

    This class removes glint by subtracting the mean near-infrared (NIR) value from each spectrum,
    based on the assumption that the glint is spectrally flat. Optionally, it applies smoothing with
    a Savitsky-Golay filter to the corrected image.
    """

    def __init__(self, smooth_with_savitsky_golay: bool = True):
        """Initialize glint corrector

        Parameters
        ----------
        smooth_with_savitsky_golay : bool, default True
            Whether to smooth glint corrected images using a
            Savitsky-Golay filter.
        """
        self.smooth_with_savitsky_golay = smooth_with_savitsky_golay

    def glint_correct_image(self, refl_image: NDArray, refl_wl: NDArray) -> NDArray:
        """
        Remove sun and sky glint from an image by assuming a flat glint spectrum.

        Parameters
        ----------
        refl_image : NDArray
            Reflectance image with shape (n_lines, n_samples, n_bands).
        refl_wl : NDArray
            Wavelength vector (nm) for each band in refl_image.

        Returns
        -------
        NDArray
            Glint corrected reflectance image with the same shape as refl_image.
            The mean NIR value is subtracted from each spectrum so that only the
            spectral offset is changed while preserving the overall spectral shape.

        Notes
        -----
        - Assumes negligible water-leaving radiance in the NIR region.
        - Assumes that the sun and sky glint have a flat spectrum, which can be a close
          approximation.
        """
        nir_ind = mpu.get_nir_ind(refl_wl)
        nir_im = np.mean(refl_image[:, :, nir_ind], axis=2, keepdims=True)
        refl_image_glint_corr = refl_image - nir_im

        if self.smooth_with_savitsky_golay:
            refl_image_glint_corr = mpu.savitzky_golay_filter(refl_image_glint_corr)

        return refl_image_glint_corr

    def glint_correct_image_file(
        self,
        image_path: Union[Path, str],
        glint_corr_image_path: Union[Path, str],
        **kwargs: Any,
    ):
        """
        Read an ENVI header file, perform glint correction, and save the corrected image.

        Parameters
        ----------
        image_path : Union[Path, str]
            Path to the hyperspectral image header.
        glint_corr_image_path : Union[Path, str]
            Path to save the glint corrected image header.
        """
        image, wl, metadata = mpu.read_envi(image_path)
        glint_corr_image = self.glint_correct_image(image, wl, **kwargs)
        mpu.save_envi(glint_corr_image_path, glint_corr_image, metadata)

__init__(smooth_with_savitsky_golay=True)

Parameters:

Name Type Description Default
smooth_with_savitsky_golay bool

Whether to smooth glint corrected images using a Savitsky-Golay filter.

True
Source code in massipipe/glint.py
def __init__(self, smooth_with_savitsky_golay: bool = True):
    """Initialize glint corrector

    Parameters
    ----------
    smooth_with_savitsky_golay : bool, default True
        Whether to smooth glint corrected images using a
        Savitsky-Golay filter.
    """
    self.smooth_with_savitsky_golay = smooth_with_savitsky_golay

glint_correct_image(refl_image, refl_wl)

Remove sun and sky glint from an image by assuming a flat glint spectrum.

Parameters:

Name Type Description Default
refl_image NDArray

Reflectance image with shape (n_lines, n_samples, n_bands).

required
refl_wl NDArray

Wavelength vector (nm) for each band in refl_image.

required

Returns:

Type Description
NDArray

Glint corrected reflectance image with the same shape as refl_image. The mean NIR value is subtracted from each spectrum so that only the spectral offset is changed while preserving the overall spectral shape.

Notes
  • Assumes negligible water-leaving radiance in the NIR region.
  • Assumes that the sun and sky glint have a flat spectrum, which can be a close approximation.
Source code in massipipe/glint.py
def glint_correct_image(self, refl_image: NDArray, refl_wl: NDArray) -> NDArray:
    """
    Remove sun and sky glint from an image by assuming a flat glint spectrum.

    Parameters
    ----------
    refl_image : NDArray
        Reflectance image with shape (n_lines, n_samples, n_bands).
    refl_wl : NDArray
        Wavelength vector (nm) for each band in refl_image.

    Returns
    -------
    NDArray
        Glint corrected reflectance image with the same shape as refl_image.
        The mean NIR value is subtracted from each spectrum so that only the
        spectral offset is changed while preserving the overall spectral shape.

    Notes
    -----
    - Assumes negligible water-leaving radiance in the NIR region.
    - Assumes that the sun and sky glint have a flat spectrum, which can be a close
      approximation.
    """
    nir_ind = mpu.get_nir_ind(refl_wl)
    nir_im = np.mean(refl_image[:, :, nir_ind], axis=2, keepdims=True)
    refl_image_glint_corr = refl_image - nir_im

    if self.smooth_with_savitsky_golay:
        refl_image_glint_corr = mpu.savitzky_golay_filter(refl_image_glint_corr)

    return refl_image_glint_corr

glint_correct_image_file(image_path, glint_corr_image_path, **kwargs)

Read an ENVI header file, perform glint correction, and save the corrected image.

Parameters:

Name Type Description Default
image_path Union[Path, str]

Path to the hyperspectral image header.

required
glint_corr_image_path Union[Path, str]

Path to save the glint corrected image header.

required
Source code in massipipe/glint.py
def glint_correct_image_file(
    self,
    image_path: Union[Path, str],
    glint_corr_image_path: Union[Path, str],
    **kwargs: Any,
):
    """
    Read an ENVI header file, perform glint correction, and save the corrected image.

    Parameters
    ----------
    image_path : Union[Path, str]
        Path to the hyperspectral image header.
    glint_corr_image_path : Union[Path, str]
        Path to save the glint corrected image header.
    """
    image, wl, metadata = mpu.read_envi(image_path)
    glint_corr_image = self.glint_correct_image(image, wl, **kwargs)
    mpu.save_envi(glint_corr_image_path, glint_corr_image, metadata)

HedleyGlintCorrector

Perform glint correction using a fitted linear regression model (Hedley method).

This class corrects glint by fitting a linear model to reference spectra and then applying this model to correct the visible spectrum of the input image. It supports optional smoothing, dark spectrum subtraction, and adjusts for negative values to ensure valid outputs.

Source code in massipipe/glint.py
class HedleyGlintCorrector:
    """
    Perform glint correction using a fitted linear regression model (Hedley method).

    This class corrects glint by fitting a linear model to reference spectra and then
    applying this model to correct the visible spectrum of the input image. It supports
    optional smoothing, dark spectrum subtraction, and adjusts for negative values to
    ensure valid outputs.
    """

    def __init__(
        self,
        smooth_spectra: bool = True,
        subtract_dark_spec: bool = True,
        set_negative_values_to_zero: bool = False,
        max_invalid_frac: float = 0.05,
    ):
        """Initialize glint corrector

        Parameters
        ----------
        smooth_spectra : bool, default True
            Whether to smooth glint corrected images using a Savitzky-Golay filter.
        subtract_dark_spec: bool
            Whether to subtract estimated minimum value in training data (for each
            wavelength) from glint corrected image. This has the effect of removing
            "background" spectrum caused by e.g. reflection of the sky and water column
            scattering.
        set_negative_values_to_zero: bool
            Glint is corrected by subtracting estimated glint from the original image.
            The subtraction process may result in some spectral bands getting negative
            values. If the fraction of pixels that is negative is larger than
            max_invalid_fraction, all pixel values are set to zero, indicating an
            invalid pixel.
        max_invalid_frac: float
            The fraction of spectral bands that is allowed to be invalid (i.e. zero)
            before the whole pixel is declared invalid and all bands are set to zero.
            Allowing some invalid bands may keep useful information, but a high number
            of invalid bands results in severe spectral distortion and indicates poor
            data quality.
        """
        self.smooth_spectra = smooth_spectra
        self.subtract_dark_spec = subtract_dark_spec
        self.set_negative_values_to_zero = set_negative_values_to_zero
        self.max_invalid_frac = max_invalid_frac
        self.b: Optional[NDArray] = None
        self.wl: Optional[NDArray] = None
        self.min_nir: Optional[NDArray] = None
        self.vis_ind: Optional[Any] = None
        self.nir_ind: Optional[Any] = None
        self.dark_spec: Optional[NDArray] = None

    def fit_to_reference_images(
        self,
        reference_image_paths: Sequence[Union[Path, str]],
        reference_image_ranges: Optional[Sequence[Optional[Sequence[int]]]] = None,
        sample_frac: float = 0.5,
    ) -> None:
        """
        Fit the glint correction model based on spectra from reference hyperspectral images.

        Parameters
        ----------
        reference_image_paths : list[Union[Path, str]]
            List of paths to reference hyperspectral image headers.
        reference_image_ranges : Union[None, list[Union[None, list[int]]]]
            List of pixel ranges to use as reference from each image. If None, the full
            image is used. Ranges are specified as [line_start, line_end, sample_start,
            sample_end].
        sample_frac : float
            Fraction of total pixels used for training (0.0 to 1.0). Pixels are randomly
            sampled.
        """
        if reference_image_ranges is None:
            reference_image_ranges = [None for _ in range(len(reference_image_paths))]

        train_spec = []
        for ref_im_path, ref_im_range in zip(reference_image_paths, reference_image_ranges):
            ref_im, wl, _ = mpu.read_envi(Path(ref_im_path))
            if ref_im_range is not None:
                assert len(ref_im_range) == 4
                line_start, line_end, sample_start, sample_end = ref_im_range
                ref_im = ref_im[line_start:line_end, sample_start:sample_end, :]
            sampled_spectra = mpu.random_sample_image(ref_im, sample_frac=sample_frac)
            train_spec.append(sampled_spectra)
        train_spec = np.concat(train_spec)
        self.fit(train_spec, wl)

    def fit(self, train_spec: NDArray, wl: NDArray) -> None:
        """
        Fit the linear glint correction model to training spectra.

        Parameters
        ----------
        train_spec : NDArray
            Training spectra with shape (n_samples, n_bands).
        wl : NDArray
            Wavelength vector (nm).
        """
        self.wl = wl
        self.vis_ind = mpu.get_vis_ind(wl)
        self.nir_ind = mpu.get_nir_ind(wl)

        x = np.mean(train_spec[:, self.nir_ind], axis=1, keepdims=True)
        Y = train_spec[:, self.vis_ind]
        self.b = self.linear_regression_multiple_dependent_variables(x, Y)
        self.min_nir = np.percentile(x, q=1, axis=None)  # Using 1st percentile as min.

        self.dark_spec = np.percentile(train_spec, q=1, axis=0)

    @staticmethod
    def linear_regression_multiple_dependent_variables(x: NDArray, Y: NDArray) -> NDArray:
        """
        Compute slopes for a multiple-output linear regression using a single predictor.

        Parameters
        ----------
        x : NDArray
            Predictor variable, shape (n_samples, 1).
        Y : NDArray
            Response variables, shape (n_samples, n_variables).

        Returns
        -------
        NDArray
            Regression slopes for each dependent variable as defined in f(x) = a + b*x.

        Notes
        -----
        - Implementation inspired by https://stackoverflow.com/questions/
        48105922/numpy-covariance-between-each-column-of-a-matrix-and-a-vector

        """
        assert Y.shape[0] == x.shape[0]

        # Compute variance of y and covariance between each column of X and y
        # NOTE: No need to scale each with N-1 as it will be cancelled when computing b
        x_zero_mean = x - x.mean()
        x_var = x_zero_mean.T @ x_zero_mean
        x_Y_cov = x_zero_mean.T @ (Y - Y.mean(axis=0))

        # Compute slopes of linear regression with y as independent variable
        b = x_Y_cov / x_var

        return b

    def glint_correct_image(
        self,
        image: NDArray,
    ) -> NDArray:
        """
        Remove sun and sky glint from a hyperspectral image using the fitted linear model.

        Parameters
        ----------
        image : NDArray
            Hyperspectral image with shape (n_lines, n_samples, n_bands).

        Returns
        -------
        NDArray
            Glint corrected image containing only visible light spectra.

        Notes
        -----
        - Based on the assumption of negligible water-leaving radiance in the NIR,
          it subtracts a modeled glint estimate from the visible bands.
        """
        # Shape into 2D array, save original shape for later
        input_shape = image.shape
        image = image.reshape((-1, image.shape[-1]))  # 2D, shape (n_samples, n_bands)

        # Detect all-zero pixels (invalid)
        invalid_mask = ~np.all(image == 0, axis=1)

        # Extract VIS and NIR bands
        vis = image[:, self.vis_ind]
        nir = np.mean(image[:, self.nir_ind], axis=1, keepdims=True)

        # Offset NIR, taking into account "ambient" (minimum) NIR
        nir = nir - self.min_nir
        nir[nir < 0] = 0  # Positivity constraint

        # Estimate glint and subtract from visible spectrum
        glint = nir @ self.b
        vis = vis - glint

        if self.subtract_dark_spec:
            if self.dark_spec is not None:
                vis = vis - self.dark_spec[self.vis_ind]
            else:
                raise ValueError("Dark spectrum not calculated - run fit() first")

        # Set negative values to zero
        if self.set_negative_values_to_zero:
            vis[vis < 0] = 0

        # Set invalid pixels (too many zeros) to all-zeros
        if self.set_negative_values_to_zero:
            zeros_fraction = np.count_nonzero(vis == 0, axis=1) / vis.shape[1]
            invalid_mask = invalid_mask & (zeros_fraction > self.max_invalid_frac)
            vis[invalid_mask] = 0

        # Reshape to fit original dimensions
        output_shape = input_shape[:-1] + (vis.shape[-1],)
        vis = np.reshape(vis, output_shape)

        # Smooth spectra (optional)
        if self.smooth_spectra:
            vis = mpu.savitzky_golay_filter(vis)

        return vis

    def glint_correct_image_file(
        self,
        image_path: Union[Path, str],
        glint_corr_image_path: Union[Path, str],
        **kwargs: Any,
    ):
        """
        Read a hyperspectral image from an ENVI header, apply glint correction, and save the result.

        Parameters
        ----------
        image_path : Union[Path, str]
            Path to the input ENVI header file.
        glint_corr_image_path : Union[Path, str]
            Path to save the glint corrected ENVI header file.
        """
        # Glint correction
        image, wl, metadata = mpu.read_envi(image_path)
        glint_corr_image = self.glint_correct_image(image)

        # Limit wavelengths to only visible
        wl = wl[self.vis_ind]
        metadata["wavelength"] = mpu.array_to_header_string(wl)
        if "solar irradiance" in metadata:
            irrad_spec = mpu.header_string_to_array(metadata["solar irradiance"])
            irrad_spec = irrad_spec[self.vis_ind]
            metadata["solar irradiance"] = mpu.array_to_header_string(irrad_spec)

        # Save
        mpu.save_envi(glint_corr_image_path, glint_corr_image, metadata)

__init__(smooth_spectra=True, subtract_dark_spec=True, set_negative_values_to_zero=False, max_invalid_frac=0.05)

Parameters:

Name Type Description Default
smooth_spectra bool

Whether to smooth glint corrected images using a Savitzky-Golay filter.

True
subtract_dark_spec bool

Whether to subtract estimated minimum value in training data (for each wavelength) from glint corrected image. This has the effect of removing "background" spectrum caused by e.g. reflection of the sky and water column scattering.

True
set_negative_values_to_zero bool

Glint is corrected by subtracting estimated glint from the original image. The subtraction process may result in some spectral bands getting negative values. If the fraction of pixels that is negative is larger than max_invalid_fraction, all pixel values are set to zero, indicating an invalid pixel.

False
max_invalid_frac float

The fraction of spectral bands that is allowed to be invalid (i.e. zero) before the whole pixel is declared invalid and all bands are set to zero. Allowing some invalid bands may keep useful information, but a high number of invalid bands results in severe spectral distortion and indicates poor data quality.

0.05
Source code in massipipe/glint.py
def __init__(
    self,
    smooth_spectra: bool = True,
    subtract_dark_spec: bool = True,
    set_negative_values_to_zero: bool = False,
    max_invalid_frac: float = 0.05,
):
    """Initialize glint corrector

    Parameters
    ----------
    smooth_spectra : bool, default True
        Whether to smooth glint corrected images using a Savitzky-Golay filter.
    subtract_dark_spec: bool
        Whether to subtract estimated minimum value in training data (for each
        wavelength) from glint corrected image. This has the effect of removing
        "background" spectrum caused by e.g. reflection of the sky and water column
        scattering.
    set_negative_values_to_zero: bool
        Glint is corrected by subtracting estimated glint from the original image.
        The subtraction process may result in some spectral bands getting negative
        values. If the fraction of pixels that is negative is larger than
        max_invalid_fraction, all pixel values are set to zero, indicating an
        invalid pixel.
    max_invalid_frac: float
        The fraction of spectral bands that is allowed to be invalid (i.e. zero)
        before the whole pixel is declared invalid and all bands are set to zero.
        Allowing some invalid bands may keep useful information, but a high number
        of invalid bands results in severe spectral distortion and indicates poor
        data quality.
    """
    self.smooth_spectra = smooth_spectra
    self.subtract_dark_spec = subtract_dark_spec
    self.set_negative_values_to_zero = set_negative_values_to_zero
    self.max_invalid_frac = max_invalid_frac
    self.b: Optional[NDArray] = None
    self.wl: Optional[NDArray] = None
    self.min_nir: Optional[NDArray] = None
    self.vis_ind: Optional[Any] = None
    self.nir_ind: Optional[Any] = None
    self.dark_spec: Optional[NDArray] = None

fit_to_reference_images(reference_image_paths, reference_image_ranges=None, sample_frac=0.5)

Fit the glint correction model based on spectra from reference hyperspectral images.

Parameters:

Name Type Description Default
reference_image_paths list[Union[Path, str]]

List of paths to reference hyperspectral image headers.

required
reference_image_ranges Union[None, list[Union[None, list[int]]]]

List of pixel ranges to use as reference from each image. If None, the full image is used. Ranges are specified as [line_start, line_end, sample_start, sample_end].

None
sample_frac float

Fraction of total pixels used for training (0.0 to 1.0). Pixels are randomly sampled.

0.5
Source code in massipipe/glint.py
def fit_to_reference_images(
    self,
    reference_image_paths: Sequence[Union[Path, str]],
    reference_image_ranges: Optional[Sequence[Optional[Sequence[int]]]] = None,
    sample_frac: float = 0.5,
) -> None:
    """
    Fit the glint correction model based on spectra from reference hyperspectral images.

    Parameters
    ----------
    reference_image_paths : list[Union[Path, str]]
        List of paths to reference hyperspectral image headers.
    reference_image_ranges : Union[None, list[Union[None, list[int]]]]
        List of pixel ranges to use as reference from each image. If None, the full
        image is used. Ranges are specified as [line_start, line_end, sample_start,
        sample_end].
    sample_frac : float
        Fraction of total pixels used for training (0.0 to 1.0). Pixels are randomly
        sampled.
    """
    if reference_image_ranges is None:
        reference_image_ranges = [None for _ in range(len(reference_image_paths))]

    train_spec = []
    for ref_im_path, ref_im_range in zip(reference_image_paths, reference_image_ranges):
        ref_im, wl, _ = mpu.read_envi(Path(ref_im_path))
        if ref_im_range is not None:
            assert len(ref_im_range) == 4
            line_start, line_end, sample_start, sample_end = ref_im_range
            ref_im = ref_im[line_start:line_end, sample_start:sample_end, :]
        sampled_spectra = mpu.random_sample_image(ref_im, sample_frac=sample_frac)
        train_spec.append(sampled_spectra)
    train_spec = np.concat(train_spec)
    self.fit(train_spec, wl)

fit(train_spec, wl)

Fit the linear glint correction model to training spectra.

Parameters:

Name Type Description Default
train_spec NDArray

Training spectra with shape (n_samples, n_bands).

required
wl NDArray

Wavelength vector (nm).

required
Source code in massipipe/glint.py
def fit(self, train_spec: NDArray, wl: NDArray) -> None:
    """
    Fit the linear glint correction model to training spectra.

    Parameters
    ----------
    train_spec : NDArray
        Training spectra with shape (n_samples, n_bands).
    wl : NDArray
        Wavelength vector (nm).
    """
    self.wl = wl
    self.vis_ind = mpu.get_vis_ind(wl)
    self.nir_ind = mpu.get_nir_ind(wl)

    x = np.mean(train_spec[:, self.nir_ind], axis=1, keepdims=True)
    Y = train_spec[:, self.vis_ind]
    self.b = self.linear_regression_multiple_dependent_variables(x, Y)
    self.min_nir = np.percentile(x, q=1, axis=None)  # Using 1st percentile as min.

    self.dark_spec = np.percentile(train_spec, q=1, axis=0)

linear_regression_multiple_dependent_variables(x, Y) staticmethod

Compute slopes for a multiple-output linear regression using a single predictor.

Parameters:

Name Type Description Default
x NDArray

Predictor variable, shape (n_samples, 1).

required
Y NDArray

Response variables, shape (n_samples, n_variables).

required

Returns:

Type Description
NDArray

Regression slopes for each dependent variable as defined in f(x) = a + b*x.

Notes
  • Implementation inspired by https://stackoverflow.com/questions/ 48105922/numpy-covariance-between-each-column-of-a-matrix-and-a-vector
Source code in massipipe/glint.py
@staticmethod
def linear_regression_multiple_dependent_variables(x: NDArray, Y: NDArray) -> NDArray:
    """
    Compute slopes for a multiple-output linear regression using a single predictor.

    Parameters
    ----------
    x : NDArray
        Predictor variable, shape (n_samples, 1).
    Y : NDArray
        Response variables, shape (n_samples, n_variables).

    Returns
    -------
    NDArray
        Regression slopes for each dependent variable as defined in f(x) = a + b*x.

    Notes
    -----
    - Implementation inspired by https://stackoverflow.com/questions/
    48105922/numpy-covariance-between-each-column-of-a-matrix-and-a-vector

    """
    assert Y.shape[0] == x.shape[0]

    # Compute variance of y and covariance between each column of X and y
    # NOTE: No need to scale each with N-1 as it will be cancelled when computing b
    x_zero_mean = x - x.mean()
    x_var = x_zero_mean.T @ x_zero_mean
    x_Y_cov = x_zero_mean.T @ (Y - Y.mean(axis=0))

    # Compute slopes of linear regression with y as independent variable
    b = x_Y_cov / x_var

    return b

glint_correct_image(image)

Remove sun and sky glint from a hyperspectral image using the fitted linear model.

Parameters:

Name Type Description Default
image NDArray

Hyperspectral image with shape (n_lines, n_samples, n_bands).

required

Returns:

Type Description
NDArray

Glint corrected image containing only visible light spectra.

Notes
  • Based on the assumption of negligible water-leaving radiance in the NIR, it subtracts a modeled glint estimate from the visible bands.
Source code in massipipe/glint.py
def glint_correct_image(
    self,
    image: NDArray,
) -> NDArray:
    """
    Remove sun and sky glint from a hyperspectral image using the fitted linear model.

    Parameters
    ----------
    image : NDArray
        Hyperspectral image with shape (n_lines, n_samples, n_bands).

    Returns
    -------
    NDArray
        Glint corrected image containing only visible light spectra.

    Notes
    -----
    - Based on the assumption of negligible water-leaving radiance in the NIR,
      it subtracts a modeled glint estimate from the visible bands.
    """
    # Shape into 2D array, save original shape for later
    input_shape = image.shape
    image = image.reshape((-1, image.shape[-1]))  # 2D, shape (n_samples, n_bands)

    # Detect all-zero pixels (invalid)
    invalid_mask = ~np.all(image == 0, axis=1)

    # Extract VIS and NIR bands
    vis = image[:, self.vis_ind]
    nir = np.mean(image[:, self.nir_ind], axis=1, keepdims=True)

    # Offset NIR, taking into account "ambient" (minimum) NIR
    nir = nir - self.min_nir
    nir[nir < 0] = 0  # Positivity constraint

    # Estimate glint and subtract from visible spectrum
    glint = nir @ self.b
    vis = vis - glint

    if self.subtract_dark_spec:
        if self.dark_spec is not None:
            vis = vis - self.dark_spec[self.vis_ind]
        else:
            raise ValueError("Dark spectrum not calculated - run fit() first")

    # Set negative values to zero
    if self.set_negative_values_to_zero:
        vis[vis < 0] = 0

    # Set invalid pixels (too many zeros) to all-zeros
    if self.set_negative_values_to_zero:
        zeros_fraction = np.count_nonzero(vis == 0, axis=1) / vis.shape[1]
        invalid_mask = invalid_mask & (zeros_fraction > self.max_invalid_frac)
        vis[invalid_mask] = 0

    # Reshape to fit original dimensions
    output_shape = input_shape[:-1] + (vis.shape[-1],)
    vis = np.reshape(vis, output_shape)

    # Smooth spectra (optional)
    if self.smooth_spectra:
        vis = mpu.savitzky_golay_filter(vis)

    return vis

glint_correct_image_file(image_path, glint_corr_image_path, **kwargs)

Read a hyperspectral image from an ENVI header, apply glint correction, and save the result.

Parameters:

Name Type Description Default
image_path Union[Path, str]

Path to the input ENVI header file.

required
glint_corr_image_path Union[Path, str]

Path to save the glint corrected ENVI header file.

required
Source code in massipipe/glint.py
def glint_correct_image_file(
    self,
    image_path: Union[Path, str],
    glint_corr_image_path: Union[Path, str],
    **kwargs: Any,
):
    """
    Read a hyperspectral image from an ENVI header, apply glint correction, and save the result.

    Parameters
    ----------
    image_path : Union[Path, str]
        Path to the input ENVI header file.
    glint_corr_image_path : Union[Path, str]
        Path to save the glint corrected ENVI header file.
    """
    # Glint correction
    image, wl, metadata = mpu.read_envi(image_path)
    glint_corr_image = self.glint_correct_image(image)

    # Limit wavelengths to only visible
    wl = wl[self.vis_ind]
    metadata["wavelength"] = mpu.array_to_header_string(wl)
    if "solar irradiance" in metadata:
        irrad_spec = mpu.header_string_to_array(metadata["solar irradiance"])
        irrad_spec = irrad_spec[self.vis_ind]
        metadata["solar irradiance"] = mpu.array_to_header_string(irrad_spec)

    # Save
    mpu.save_envi(glint_corr_image_path, glint_corr_image, metadata)

IrradianceConverter

A class to convert raw spectra to irradiance using calibration data.

Methods:

Name Description
convert_raw_spectrum_to_irradiance

set_irradiance_outside_wl_limits_to_zero=True, keep_original_dimensions=True) Convert raw spectrum to irradiance.

convert_raw_file_to_irradiance

Read raw spectrum, convert to irradiance, and save.

Source code in massipipe/irradiance.py
class IrradianceConverter:
    """
    A class to convert raw spectra to irradiance using calibration data.

    Methods
    -------
    convert_raw_spectrum_to_irradiance(raw_spec, raw_metadata,
        set_irradiance_outside_wl_limits_to_zero=True, keep_original_dimensions=True)
        Convert raw spectrum to irradiance.
    convert_raw_file_to_irradiance(raw_spec_path, irrad_spec_path)
        Read raw spectrum, convert to irradiance, and save.
    """

    def __init__(
        self,
        irrad_cal_file: Union[Path, str],
        irrad_cal_dir_name: str = "downwelling_calibration_spectra",
        wl_min: Union[float, None] = None,
        wl_max: Union[float, None] = None,
    ):
        """Initialize irradiance converter.

        Parameters
        ----------
        irrad_cal_file : Union[Path, str]
            Path to downwelling irradiance calibration file.
        irrad_cal_dir_name : str, default "downwelling_calibration_spectra"
            Name of folder which calibration files will be unzipped into.
        wl_min : Union[float, None], optional
            Shortest valid wavelength (nm) in irradiance spectrum.
        wl_max : Union[float, None], optional
            Longest valid wavelength (nm) in irradiance spectrum.
        """
        # Save calibration file path
        self.irrad_cal_file = Path(irrad_cal_file)
        if not self.irrad_cal_file.exists():
            raise FileNotFoundError(f"Calibration file {self.irrad_cal_file} does not exist.")

        # Unzip calibration frames into subdirectory
        self.irrad_cal_dir = self.irrad_cal_file.parent / irrad_cal_dir_name
        self.irrad_cal_dir.mkdir(exist_ok=True)
        self._unzip_irrad_cal_file()

        # Load calibration data
        self._load_cal_dark_and_sensitivity_spectra()

        # Set valid wavelength range
        self.wl_min = self._irrad_wl[0] if wl_min is None else wl_min
        self.wl_max = self._irrad_wl[-1] if wl_max is None else wl_max
        self._valid_wl_ind = (self._irrad_wl >= self.wl_min) & (self._irrad_wl <= self.wl_max)

    def _unzip_irrad_cal_file(self, unzip_into_nonempty_dir: bool = False) -> None:
        """Unzip *.dcp file (which is a zip file).

        Parameters
        ----------
        unzip_into_nonempty_dir : bool, optional
            Whether to unzip into a non-empty directory.
        """
        if not unzip_into_nonempty_dir and any(list(self.irrad_cal_dir.iterdir())):
            logger.info(f"Non-empty downwelling calibration directory {self.irrad_cal_dir}")
            logger.info(
                "Skipping unzipping of downwelling calibration file, "
                "assuming unzipping already done."
            )
            return
        try:
            with zipfile.ZipFile(self.irrad_cal_file, mode="r") as zip_file:
                for filename in zip_file.namelist():
                    zip_file.extract(filename, self.irrad_cal_dir)
        except zipfile.BadZipFile:
            logger.error(f"File {self.irrad_cal_file} is not a valid ZIP file.", exc_info=True)
            raise
        except Exception as e:
            logger.error(
                f"Error while extracting downwelling calibration file {self.irrad_cal_file}",
                exc_info=True,
            )
            raise

    def _load_cal_dark_and_sensitivity_spectra(self) -> None:
        """Load dark current and irradiance sensitivity spectra from cal. files."""
        # Define paths
        cal_dark_path = self.irrad_cal_dir / "offset.spec.hdr"
        irrad_sens_path = self.irrad_cal_dir / "gain.spec.hdr"
        if not cal_dark_path.exists() or not irrad_sens_path.exists():
            raise FileNotFoundError("Calibration files not found in the specified directory.")

        # Read from files
        cal_dark_spec, cal_dark_wl, cal_dark_metadata = mpu.read_envi(cal_dark_path)
        irrad_sens_spec, irrad_sens_wl, irrad_sens_metadata = mpu.read_envi(irrad_sens_path)

        # Save attributes
        if not np.array_equal(cal_dark_wl, irrad_sens_wl):
            raise ValueError("Wavelengths in dark current and gain spectra do not match.")
        self._irrad_wl = cal_dark_wl
        self._cal_dark_spec = np.squeeze(cal_dark_spec)  # Remove singleton axes
        self._cal_dark_metadata = cal_dark_metadata
        self._irrad_sens_spec = np.squeeze(irrad_sens_spec)  # Remove singleton axes
        self._irrad_sens_metadata = irrad_sens_metadata
        self._cal_dark_shutter = float(cal_dark_metadata["shutter"])
        self._irrad_sens_shutter = float(irrad_sens_metadata["shutter"])

    def convert_raw_spectrum_to_irradiance(
        self,
        raw_spec: NDArray,
        raw_metadata: dict,
        set_irradiance_outside_wl_limits_to_zero: bool = True,
        keep_original_dimensions: bool = True,
    ) -> NDArray:
        """Convert raw spectrum to irradiance.

        Parameters
        ----------
        raw_spec : NDArray
            Raw spectrum.
        raw_metadata : dict
            Dictionary with ENVI metadata for raw spectrum.
        set_irradiance_outside_wl_limits_to_zero : bool, optional
            Whether to set spectrum values outside wavelength limits to zero.
            If False, values outside valid range are treated the same as
            values inside valid range.
        keep_original_dimensions : bool, optional
            Whether to format output spectrum with same (singleton)
            dimensions as input spectrum. Can be useful for broadcasting.

        Returns
        -------
        NDArray
            Spectrum converted to spectral irradiance, unit W/(m2*nm).

        Notes
        -----
        The irradiance conversion spectrum is _inversely_ scaled with
        the input spectrum shutter. E.g., if the input spectrum has a higher shutter
        value than the calibration file (i.e. higher values per amount of photons),
        the conversion spectrum values are decreased to account for this.
        Dark current is assumed to be independent of shutter value.
        """
        original_input_dimensions = raw_spec.shape
        raw_spec = np.squeeze(raw_spec)
        if raw_spec.ndim != 1:
            raise ValueError("Input raw_spec must be a 1D array.")

        # Scale conversion spectrum according to difference in shutter values
        raw_shutter = float(raw_metadata["shutter"])
        scaled_conv_spec = (self._irrad_sens_shutter / raw_shutter) * self._irrad_sens_spec

        # Subtract dark current, multiply with radiance conversion spectrum
        # NOTE: Resonon irradiance unit is uW/(pi*cm2*um) = 10e-5 W/(pi*m2*nm)
        # Scaling with pi is at least consistent with Resonon code.
        cal_irrad_spec = (raw_spec - self._cal_dark_spec) * scaled_conv_spec

        # Convert to standard spectral irradiance unit W/(m2*nm)
        cal_irrad_spec = mpu.irrad_uflicklike_to_si_nm(cal_irrad_spec * np.pi)

        # Set spectrum outside wavelength limits to zero
        if set_irradiance_outside_wl_limits_to_zero:
            cal_irrad_spec[~self._valid_wl_ind] = 0

        if keep_original_dimensions:
            cal_irrad_spec = np.reshape(cal_irrad_spec, original_input_dimensions)

        return cal_irrad_spec

    def convert_raw_file_to_irradiance(
        self, raw_spec_path: Union[Path, str], irrad_spec_path: Union[Path, str]
    ) -> None:
        """Read raw spectrum, convert to irradiance, and save.

        Parameters
        ----------
        raw_spec_path : Union[Path, str]
            Path to ENVI header file for raw spectrum.
        irrad_spec_path : Union[Path, str]
            Path to ENVI header file for saving irradiance spectrum.
        """
        try:
            raw_spec, _, spec_metadata = mpu.read_envi(raw_spec_path)
            irrad_spec = self.convert_raw_spectrum_to_irradiance(raw_spec, spec_metadata)
            spec_metadata["unit"] = "W/(m2*nm)"
            mpu.save_envi(irrad_spec_path, irrad_spec, spec_metadata)
        except Exception as e:
            logger.error(f"Error converting raw file to irradiance: {e}", exc_info=True)
            raise

__init__(irrad_cal_file, irrad_cal_dir_name='downwelling_calibration_spectra', wl_min=None, wl_max=None)

Parameters:

Name Type Description Default
irrad_cal_file Union[Path, str]

Path to downwelling irradiance calibration file.

required
irrad_cal_dir_name str

Name of folder which calibration files will be unzipped into.

"downwelling_calibration_spectra"
wl_min Union[float, None]

Shortest valid wavelength (nm) in irradiance spectrum.

None
wl_max Union[float, None]

Longest valid wavelength (nm) in irradiance spectrum.

None
Source code in massipipe/irradiance.py
def __init__(
    self,
    irrad_cal_file: Union[Path, str],
    irrad_cal_dir_name: str = "downwelling_calibration_spectra",
    wl_min: Union[float, None] = None,
    wl_max: Union[float, None] = None,
):
    """Initialize irradiance converter.

    Parameters
    ----------
    irrad_cal_file : Union[Path, str]
        Path to downwelling irradiance calibration file.
    irrad_cal_dir_name : str, default "downwelling_calibration_spectra"
        Name of folder which calibration files will be unzipped into.
    wl_min : Union[float, None], optional
        Shortest valid wavelength (nm) in irradiance spectrum.
    wl_max : Union[float, None], optional
        Longest valid wavelength (nm) in irradiance spectrum.
    """
    # Save calibration file path
    self.irrad_cal_file = Path(irrad_cal_file)
    if not self.irrad_cal_file.exists():
        raise FileNotFoundError(f"Calibration file {self.irrad_cal_file} does not exist.")

    # Unzip calibration frames into subdirectory
    self.irrad_cal_dir = self.irrad_cal_file.parent / irrad_cal_dir_name
    self.irrad_cal_dir.mkdir(exist_ok=True)
    self._unzip_irrad_cal_file()

    # Load calibration data
    self._load_cal_dark_and_sensitivity_spectra()

    # Set valid wavelength range
    self.wl_min = self._irrad_wl[0] if wl_min is None else wl_min
    self.wl_max = self._irrad_wl[-1] if wl_max is None else wl_max
    self._valid_wl_ind = (self._irrad_wl >= self.wl_min) & (self._irrad_wl <= self.wl_max)

_unzip_irrad_cal_file(unzip_into_nonempty_dir=False)

Unzip *.dcp file (which is a zip file).

Parameters:

Name Type Description Default
unzip_into_nonempty_dir bool

Whether to unzip into a non-empty directory.

False
Source code in massipipe/irradiance.py
def _unzip_irrad_cal_file(self, unzip_into_nonempty_dir: bool = False) -> None:
    """Unzip *.dcp file (which is a zip file).

    Parameters
    ----------
    unzip_into_nonempty_dir : bool, optional
        Whether to unzip into a non-empty directory.
    """
    if not unzip_into_nonempty_dir and any(list(self.irrad_cal_dir.iterdir())):
        logger.info(f"Non-empty downwelling calibration directory {self.irrad_cal_dir}")
        logger.info(
            "Skipping unzipping of downwelling calibration file, "
            "assuming unzipping already done."
        )
        return
    try:
        with zipfile.ZipFile(self.irrad_cal_file, mode="r") as zip_file:
            for filename in zip_file.namelist():
                zip_file.extract(filename, self.irrad_cal_dir)
    except zipfile.BadZipFile:
        logger.error(f"File {self.irrad_cal_file} is not a valid ZIP file.", exc_info=True)
        raise
    except Exception as e:
        logger.error(
            f"Error while extracting downwelling calibration file {self.irrad_cal_file}",
            exc_info=True,
        )
        raise

_load_cal_dark_and_sensitivity_spectra()

Load dark current and irradiance sensitivity spectra from cal. files.

Source code in massipipe/irradiance.py
def _load_cal_dark_and_sensitivity_spectra(self) -> None:
    """Load dark current and irradiance sensitivity spectra from cal. files."""
    # Define paths
    cal_dark_path = self.irrad_cal_dir / "offset.spec.hdr"
    irrad_sens_path = self.irrad_cal_dir / "gain.spec.hdr"
    if not cal_dark_path.exists() or not irrad_sens_path.exists():
        raise FileNotFoundError("Calibration files not found in the specified directory.")

    # Read from files
    cal_dark_spec, cal_dark_wl, cal_dark_metadata = mpu.read_envi(cal_dark_path)
    irrad_sens_spec, irrad_sens_wl, irrad_sens_metadata = mpu.read_envi(irrad_sens_path)

    # Save attributes
    if not np.array_equal(cal_dark_wl, irrad_sens_wl):
        raise ValueError("Wavelengths in dark current and gain spectra do not match.")
    self._irrad_wl = cal_dark_wl
    self._cal_dark_spec = np.squeeze(cal_dark_spec)  # Remove singleton axes
    self._cal_dark_metadata = cal_dark_metadata
    self._irrad_sens_spec = np.squeeze(irrad_sens_spec)  # Remove singleton axes
    self._irrad_sens_metadata = irrad_sens_metadata
    self._cal_dark_shutter = float(cal_dark_metadata["shutter"])
    self._irrad_sens_shutter = float(irrad_sens_metadata["shutter"])

convert_raw_spectrum_to_irradiance(raw_spec, raw_metadata, set_irradiance_outside_wl_limits_to_zero=True, keep_original_dimensions=True)

Convert raw spectrum to irradiance.

Parameters:

Name Type Description Default
raw_spec NDArray

Raw spectrum.

required
raw_metadata dict

Dictionary with ENVI metadata for raw spectrum.

required
set_irradiance_outside_wl_limits_to_zero bool

Whether to set spectrum values outside wavelength limits to zero. If False, values outside valid range are treated the same as values inside valid range.

True
keep_original_dimensions bool

Whether to format output spectrum with same (singleton) dimensions as input spectrum. Can be useful for broadcasting.

True

Returns:

Type Description
NDArray

Spectrum converted to spectral irradiance, unit W/(m2*nm).

Notes

The irradiance conversion spectrum is inversely scaled with the input spectrum shutter. E.g., if the input spectrum has a higher shutter value than the calibration file (i.e. higher values per amount of photons), the conversion spectrum values are decreased to account for this. Dark current is assumed to be independent of shutter value.

Source code in massipipe/irradiance.py
def convert_raw_spectrum_to_irradiance(
    self,
    raw_spec: NDArray,
    raw_metadata: dict,
    set_irradiance_outside_wl_limits_to_zero: bool = True,
    keep_original_dimensions: bool = True,
) -> NDArray:
    """Convert raw spectrum to irradiance.

    Parameters
    ----------
    raw_spec : NDArray
        Raw spectrum.
    raw_metadata : dict
        Dictionary with ENVI metadata for raw spectrum.
    set_irradiance_outside_wl_limits_to_zero : bool, optional
        Whether to set spectrum values outside wavelength limits to zero.
        If False, values outside valid range are treated the same as
        values inside valid range.
    keep_original_dimensions : bool, optional
        Whether to format output spectrum with same (singleton)
        dimensions as input spectrum. Can be useful for broadcasting.

    Returns
    -------
    NDArray
        Spectrum converted to spectral irradiance, unit W/(m2*nm).

    Notes
    -----
    The irradiance conversion spectrum is _inversely_ scaled with
    the input spectrum shutter. E.g., if the input spectrum has a higher shutter
    value than the calibration file (i.e. higher values per amount of photons),
    the conversion spectrum values are decreased to account for this.
    Dark current is assumed to be independent of shutter value.
    """
    original_input_dimensions = raw_spec.shape
    raw_spec = np.squeeze(raw_spec)
    if raw_spec.ndim != 1:
        raise ValueError("Input raw_spec must be a 1D array.")

    # Scale conversion spectrum according to difference in shutter values
    raw_shutter = float(raw_metadata["shutter"])
    scaled_conv_spec = (self._irrad_sens_shutter / raw_shutter) * self._irrad_sens_spec

    # Subtract dark current, multiply with radiance conversion spectrum
    # NOTE: Resonon irradiance unit is uW/(pi*cm2*um) = 10e-5 W/(pi*m2*nm)
    # Scaling with pi is at least consistent with Resonon code.
    cal_irrad_spec = (raw_spec - self._cal_dark_spec) * scaled_conv_spec

    # Convert to standard spectral irradiance unit W/(m2*nm)
    cal_irrad_spec = mpu.irrad_uflicklike_to_si_nm(cal_irrad_spec * np.pi)

    # Set spectrum outside wavelength limits to zero
    if set_irradiance_outside_wl_limits_to_zero:
        cal_irrad_spec[~self._valid_wl_ind] = 0

    if keep_original_dimensions:
        cal_irrad_spec = np.reshape(cal_irrad_spec, original_input_dimensions)

    return cal_irrad_spec

convert_raw_file_to_irradiance(raw_spec_path, irrad_spec_path)

Read raw spectrum, convert to irradiance, and save.

Parameters:

Name Type Description Default
raw_spec_path Union[Path, str]

Path to ENVI header file for raw spectrum.

required
irrad_spec_path Union[Path, str]

Path to ENVI header file for saving irradiance spectrum.

required
Source code in massipipe/irradiance.py
def convert_raw_file_to_irradiance(
    self, raw_spec_path: Union[Path, str], irrad_spec_path: Union[Path, str]
) -> None:
    """Read raw spectrum, convert to irradiance, and save.

    Parameters
    ----------
    raw_spec_path : Union[Path, str]
        Path to ENVI header file for raw spectrum.
    irrad_spec_path : Union[Path, str]
        Path to ENVI header file for saving irradiance spectrum.
    """
    try:
        raw_spec, _, spec_metadata = mpu.read_envi(raw_spec_path)
        irrad_spec = self.convert_raw_spectrum_to_irradiance(raw_spec, spec_metadata)
        spec_metadata["unit"] = "W/(m2*nm)"
        mpu.save_envi(irrad_spec_path, irrad_spec, spec_metadata)
    except Exception as e:
        logger.error(f"Error converting raw file to irradiance: {e}", exc_info=True)
        raise

Pipeline

Pipeline class for processing hyperspectral datasets.

This class encapsulates all processing steps required to convert raw or radiance data into calibrated and georeferenced image products. It manages configuration loading, logging setup, file path generation, conversion procedures, glint correction, mosaicing and export operations.

Source code in massipipe/pipeline.py
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
class Pipeline:
    """Pipeline class for processing hyperspectral datasets.

    This class encapsulates all processing steps required to convert raw or radiance data
    into calibrated and georeferenced image products. It manages configuration loading,
    logging setup, file path generation, conversion procedures, glint correction,
    mosaicing and export operations.
    """

    def __init__(
        self,
        dataset_dir: Union[Path, str],
        config_file_name: str = "config.seabee.yaml",
    ) -> None:
        """Initialize the pipeline for dataset processing.

        Parameters
        ----------
        dataset_dir : Union[Path, str]
            Path to the dataset directory containing required subfolders. The required
            subfolders are either "0_raw" and "calibration" (for processing from raw
            data), or "1a_radiance" and "imudata" (for processing from radiance data).
        config_file_name : str, optional
            Name of the YAML configuration file. A template is generated if the file
            does not exist.

        Raises
        ------
        FileNotFoundError
            If required folders (e.g. "0_raw" or "calibration") are missing.
        """

        # Set dataset directory
        self.dataset_dir = Path(dataset_dir)

        # Define dataset folder structure
        self.dataset_base_name = self.dataset_dir.name
        self.raw_dir = self.dataset_dir / "0_raw"
        self.quicklook_dir = self.dataset_dir / "quicklook"
        self.radiance_dir = self.dataset_dir / "1a_radiance"
        self.radiance_rgb_dir = self.radiance_dir / "rgb"
        self.radiance_gc_dir = self.dataset_dir / "1b_radiance_gc"
        self.radiance_gc_rgb_dir = self.radiance_gc_dir / "rgb"
        self.reflectance_dir = self.dataset_dir / "2a_reflectance"
        self.reflectance_gc_dir = self.dataset_dir / "2b_reflectance_gc"
        self.reflectance_gc_rgb_dir = self.reflectance_gc_dir / "rgb"
        self.imudata_dir = self.dataset_dir / "imudata"
        self.geotransform_dir = self.dataset_dir / "geotransform"
        self.mosaic_dir = self.dataset_dir / "mosaics"
        self.mosaic_visualization_dir = self.dataset_dir / "orthomosaic"
        self.calibration_dir = self.dataset_dir / "calibration"
        self.logs_dir = self.dataset_dir / "logs"

        # Configure logging
        self._configure_file_logging()

        # Read config file
        config_file_path = self.dataset_dir / config_file_name
        self.config_file_path = config_file_path
        if not self.config_file_path.exists():
            logger.info(f"No config file found - exporting template file {config_file_name}")
            export_template_yaml(self.config_file_path)
        self.load_config_from_file()  # Reads config from file into self.config

        # Check if data source is raw data or radiance
        self.data_starting_point = self._check_data_starting_point()

        if self.data_starting_point == "raw":
            # Get calibration file paths
            if not self.calibration_dir.exists():
                raise FileNotFoundError(f'Folder "calibration" not found in {self.dataset_dir}')
            self.radiance_calibration_file = self._get_radiance_calibration_path()
            self.irradiance_calibration_file = self._get_irradiance_calibration_path()

            # Search for raw files, sort and validate
            self.raw_image_paths = list(self.raw_dir.rglob("*.bil.hdr"))
            self.raw_image_paths = sorted(self.raw_image_paths, key=self._get_image_number)
            times_paths, lcf_paths = self._validate_raw_files()
            self.times_paths = times_paths
            self.lcf_paths = lcf_paths

            # Search for raw irradiance spectrum files (not always present)
            self.raw_spec_paths = self._get_raw_spectrum_paths()

        # Create "base" file names numbered from 0
        self.base_file_names = self._create_base_file_names()

        # Create lists of processed file paths
        proc_file_paths = self._create_processed_file_paths()
        self.ql_im_paths = proc_file_paths.quicklook
        self.rad_im_paths = proc_file_paths.radiance
        self.rad_rgb_paths = proc_file_paths.radiance_rgb
        self.rad_gc_im_paths = proc_file_paths.radiance_gc
        self.rad_gc_rgb_paths = proc_file_paths.radiance_gc_rgb
        self.irrad_spec_paths = proc_file_paths.irradiance
        self.imu_data_paths = proc_file_paths.imudata
        self.geotransform_paths = proc_file_paths.geotransform
        self.refl_im_paths = proc_file_paths.reflectance
        self.refl_gc_im_paths = proc_file_paths.reflectance_gc
        self.refl_gc_rgb_paths = proc_file_paths.reflectance_gc_rgb

        # Create mosaic file paths
        self.mosaic_rad_path = self.mosaic_dir / (self.dataset_base_name + "_rad_rgb.tiff")
        self.mosaic_rad_gc_path = self.mosaic_dir / (self.dataset_base_name + "_rad_gc_rgb.tiff")
        self.mosaic_refl_gc_path = self.mosaic_dir / (self.dataset_base_name + "_refl_gc_rgb.tiff")

    def load_config_from_file(self) -> None:
        """Load or reload configuration from a YAML file.

        Reads the configuration from `self.config_file_path`, validates it using Pydantic,
        and assigns the loaded options to `self.config`. Logs warnings on validation errors.
        """
        try:
            yaml_config_dict = read_config(self.config_file_path)
        except IOError:
            logger.exception(f"Error parsing config file {self.config_file_path}")
            raise

        try:
            full_config = Config(**yaml_config_dict)
        except ValidationError as e:
            logger.warning(f"Validation error while processing {self.config_file_path}")
            logger.warning(str(e))
            logger.warning(f"No configuration loaded for {self.dataset_dir}.")
            return
        self.config = full_config.massipipe_options
        logger.info(f"Pipeline configured with \n{pprint.pformat(full_config.model_dump())}")

    def _configure_file_logging(self) -> None:
        """Configure file logging for pipeline execution.

        Creates a logs directory if needed and initializes a file handler with a
        timestamped log file.
        """
        # Create log file path
        self.logs_dir.mkdir(exist_ok=True)
        timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
        log_file_name = f"{timestamp}_{self.dataset_base_name}.log"
        log_path = self.logs_dir / log_file_name

        # Add file handler to module-level logger
        file_handler = logging.FileHandler(log_path)
        formatter = logging.Formatter(
            fmt="%(asctime)s %(levelname)s: %(message)s", datefmt="%H:%M:%S"
        )
        file_handler.setFormatter(formatter)
        file_handler.setLevel(logging.INFO)
        logger.addHandler(file_handler)
        logger.info("-" * 64)
        logger.info(f"File logging for {self.dataset_base_name} initialized.")

    def _check_data_starting_point(self) -> str:
        """Determine if processing should start from raw or radiance data.

        Returns
        -------
        str
            "raw" if raw data exists and "radiance" if radiance data exists.

        Raises
        ------
        FileNotFoundError
            If neither raw nor radiance data is found in the dataset directory.
        """
        if self.raw_dir.exists():
            logger.info(f"Found directory {self.raw_dir} - processing from raw files.")
            data_starting_point = "raw"
        else:
            if self.radiance_dir.exists():
                logger.info(
                    f"No raw files found, but radiance directory {self.radiance_dir.name} found"
                    + " - processing based on radiance data."
                )
                data_starting_point = "radiance"
            else:
                raise FileNotFoundError(f"Found neither raw or radiance files in dataset dir.")
        return data_starting_point

    def _validate_raw_files(self) -> tuple[list[Path], list[Path]]:
        """Check that all expected raw files exist

        Returns:
        --------
        times_paths, lcf_paths: list[Path]
            Lists of paths to *.times and *.lcf files for every valid raw file
        """
        times_paths = []
        lcf_paths = []
        for raw_image_path in list(self.raw_image_paths):  # Use list() to copy
            file_base_name = raw_image_path.name.split(".")[0]
            binary_im_path = raw_image_path.parent / (file_base_name + ".bil")
            times_path = raw_image_path.parent / (file_base_name + ".bil.times")
            lcf_path = raw_image_path.parent / (file_base_name + ".lcf")
            if (
                not (binary_im_path.exists())
                or not (times_path.exists())
                or not (lcf_path.exists())
            ):
                logger.warning(f"Set of raw files for image {raw_image_path} is incomplete.")
                self.raw_image_paths.remove(raw_image_path)
            else:
                times_paths.append(times_path)
                lcf_paths.append(lcf_path)
        return times_paths, lcf_paths

    @staticmethod
    def _get_image_number(raw_image_path: Union[Path, str]) -> int:
        """Get image number from raw image

        Parameters
        ----------
        raw_image_path : Union[Path, str]
            Path to raw image (ENVI header file)

        Returns
        -------
        int
            Image number

        Examples
        --------
        >>> Pipeline._get_image_number("ExampleLocation_Pika_L_5.bil.hdr")
        5

        Notes
        -----
        Raw files are numbered sequentially, but the numbers are not
        zero-padded. This can lead to incorrect sorting of the images, e.g.
        ['im_1','im_2','im_10'] (simplified names for example) are sorted
        ['im_1','im_10','im_2']. By extracting the numbers from filenames
        of raw files and sorting explicitly on these (as integers),
        correct ordering can be achieved.
        """
        raw_image_path = Path(raw_image_path)
        image_file_stem = raw_image_path.name.split(".")[0]
        image_number = image_file_stem.split("_")[-1]
        return int(image_number)

    def _create_base_file_names(self) -> list[str]:
        """Create numbered base names for processed files"""
        if self.data_starting_point == "raw":
            base_file_names = [
                f"{self.dataset_base_name}_{i:03d}" for i in range(len(self.raw_image_paths))
            ]
        else:  # Radiance as data starting point
            base_file_names = sorted(
                [
                    file.name.split("_radiance")[0]
                    for file in self.radiance_dir.glob("*_radiance*.hdr")
                ]
            )
        return base_file_names

    def _create_processed_file_paths(self) -> ProcessedFilePaths:
        """Define default subfolders for processed files"""
        proc_file_paths = ProcessedFilePaths()

        for base_file_name in self.base_file_names:
            # Quicklook
            ql_path = self.quicklook_dir / (base_file_name + "_quicklook.png")
            proc_file_paths.quicklook.append(ql_path)

            # Radiance
            rad_path = self.radiance_dir / (base_file_name + "_radiance.bip.hdr")
            proc_file_paths.radiance.append(rad_path)

            # Radiance, RGB version
            rad_rgb_path = self.radiance_rgb_dir / (base_file_name + "_radiance_rgb.tiff")
            proc_file_paths.radiance_rgb.append(rad_rgb_path)

            # Radiance, glint corrected
            rad_gc_path = self.radiance_gc_dir / (base_file_name + "_radiance_gc.bip.hdr")
            proc_file_paths.radiance_gc.append(rad_gc_path)

            # Radiance, glint corrected, RGB version
            rad_gc_rgb_path = self.radiance_gc_rgb_dir / (base_file_name + "_radiance_gc_rgb.tiff")
            proc_file_paths.radiance_gc_rgb.append(rad_gc_rgb_path)

            # Irradiance
            irs_path = self.radiance_dir / (base_file_name + "_irradiance.spec.hdr")
            proc_file_paths.irradiance.append(irs_path)

            # IMU data
            imu_path = self.imudata_dir / (base_file_name + "_imudata.json")
            proc_file_paths.imudata.append(imu_path)

            # Geotransform
            gt_path = self.geotransform_dir / (base_file_name + "_geotransform.json")
            proc_file_paths.geotransform.append(gt_path)

            # Reflectance
            refl_path = self.reflectance_dir / (base_file_name + "_reflectance.bip.hdr")
            proc_file_paths.reflectance.append(refl_path)

            # Reflectance, glint corrected
            rgc_path = self.reflectance_gc_dir / (base_file_name + "_reflectance_gc.bip.hdr")
            proc_file_paths.reflectance_gc.append(rgc_path)

            # Reflectance, glint corrected, RGB version
            rgc_rgb_path = self.reflectance_gc_rgb_dir / (
                base_file_name + "_reflectance_gc_rgb.tiff"
            )
            proc_file_paths.reflectance_gc_rgb.append(rgc_rgb_path)

        return proc_file_paths

    def _get_raw_spectrum_paths(self) -> list[Path]:
        """Search for raw files matching Resonon default naming"""
        spec_paths = []
        for raw_image_path in self.raw_image_paths:
            spec_base_name = raw_image_path.name.split("_Pika_L")[0]
            image_number = self._get_image_number(raw_image_path)
            spec_binary = (
                raw_image_path.parent / f"{spec_base_name}_downwelling_{image_number}_pre.spec"
            )
            spec_header = raw_image_path.parent / (spec_binary.name + ".hdr")
            if spec_binary.exists() and spec_header.exists():
                spec_paths.append(spec_header)
            else:
                spec_paths.append(None)
        return spec_paths

    def _get_radiance_calibration_path(self) -> Path:
        """Search for radiance calibration file (*.icp)"""
        icp_files = list(self.calibration_dir.glob("*.icp"))
        if len(icp_files) == 1:
            return icp_files[0]
        elif len(icp_files) == 0:
            raise FileNotFoundError(
                f"No radiance calibration file (*.icp) found in {self.calibration_dir}"
            )
        else:
            raise ValueError(
                "More than one radiance calibration file (*.icp) found in "
                + f"{self.calibration_dir}"
            )

    def _get_irradiance_calibration_path(self) -> Path:
        """Search for irradiance calibration file (*.dcp)"""
        dcp_files = list(self.calibration_dir.glob("*.dcp"))
        if len(dcp_files) == 1:
            return dcp_files[0]
        elif len(dcp_files) == 0:
            raise FileNotFoundError(
                "No irradiance calibration file (*.dcp) found in " + f"{self.calibration_dir}"
            )
        else:
            raise ValueError(
                "More than one irradiance calibration file (*.dcp) found in "
                + f"{self.calibration_dir}"
            )

    def create_quicklook_images(self) -> None:
        """Create quicklook versions of hyperspectral images"""
        logger.info("---- QUICKLOOK IMAGE GENERATION ----")
        self.quicklook_dir.mkdir(exist_ok=True)
        quicklook_processor = QuickLookProcessor(
            rgb_wl=self.config.general.rgb_wl, percentiles=self.config.quicklook.percentiles
        )

        # Determine whether raw or radiance images are used for quicklook
        if self.raw_dir.exists():
            hyspec_image_paths = self.raw_image_paths
            logger.info(f"Creating quicklook images from raw images")
        else:
            hyspec_image_paths = self.rad_im_paths
            logger.info(f"Creating quicklook images from radiance images")

        for hyspec_image_path, quicklook_image_path in zip(hyspec_image_paths, self.ql_im_paths):
            if quicklook_image_path.exists() and not self.config.quicklook.overwrite:
                logger.info(f"Image {quicklook_image_path.name} exists - skipping.")
                continue

            logger.info(f"Creating quicklook version of {hyspec_image_path.name}")
            try:
                quicklook_processor.create_quicklook_image_file(
                    hyspec_image_path, quicklook_image_path
                )
            except Exception as e:
                logger.exception(f"Error creating quicklook for {hyspec_image_path.name}")
                continue

    def parse_and_save_imu_data(self) -> None:
        """Parse *.lcf and *.times files with IMU data and save as JSON"""
        logger.info("---- IMU DATA PROCESSING ----")
        self.imudata_dir.mkdir(exist_ok=True)
        imu_data_parser = ImuDataParser()
        for lcf_path, times_path, imu_data_path in zip(
            self.lcf_paths, self.times_paths, self.imu_data_paths
        ):
            if imu_data_path.exists() and not self.config.imu_data.overwrite:
                logger.info(f"Image {imu_data_path.name} exists - skipping.")
                continue

            logger.info(f"Processing IMU data from {lcf_path.name}")
            try:
                imu_data_parser.read_and_save_imu_data(lcf_path, times_path, imu_data_path)
            except Exception as e:
                logger.exception(f"Error processing IMU data from {lcf_path.name}")
                continue

    def create_and_save_geotransform(self) -> None:
        logger.info("---- GEOTRANSFORM CALCULATION ----")
        self.geotransform_dir.mkdir(exist_ok=True)

        if not any([imu_file.exists() for imu_file in self.imu_data_paths]):
            raise FileNotFoundError("No IMU data files found.")

        # Determine whether raw or radiance images are used
        if self.raw_dir.exists():
            hyspec_image_paths = self.raw_image_paths
        else:
            hyspec_image_paths = self.rad_im_paths

        # Create geotransform for each image
        for hyspec_im_path, imu_data_path, geotrans_path in zip(
            hyspec_image_paths, self.imu_data_paths, self.geotransform_paths
        ):
            if geotrans_path.exists() and not self.config.geotransform.overwrite:
                logger.info(f"Image {geotrans_path.name} exists - skipping.")
                continue

            logger.info(f"Creating and saving geotransform based on {imu_data_path.name}")
            try:
                if imu_data_path.exists() and hyspec_im_path.exists():
                    geotransformer = ImuGeoTransformer(
                        imu_data_path,
                        hyspec_im_path,
                        camera_opening_angle=self.config.geotransform.camera_opening_angle_deg,
                        pitch_offset=self.config.geotransform.pitch_offset_deg,
                        roll_offset=self.config.geotransform.roll_offset_deg,
                        altitude_offset=self.config.geotransform.altitude_offset_m,
                        utm_x_offset=self.config.geotransform.utm_x_offset_m,
                        utm_y_offset=self.config.geotransform.utm_y_offset_m,
                        assume_square_pixels=self.config.geotransform.assume_square_pixels,
                    )
                    geotransformer.save_image_geotransform(geotrans_path)
            except Exception as e:
                logger.exception(f"Error creating geotransform based on {imu_data_path.name}")
                continue

    def convert_raw_images_to_radiance(self) -> None:
        """Convert raw hyperspectral images (DN) to radiance (microflicks)"""
        logger.info("---- RADIANCE CONVERSION ----")
        self.radiance_dir.mkdir(exist_ok=True)
        radiance_converter = RadianceConverter(
            self.radiance_calibration_file,
            set_saturated_pixels_to_zero=self.config.radiance.set_saturated_pixels_to_zero,
        )
        for raw_image_path, radiance_image_path in zip(self.raw_image_paths, self.rad_im_paths):
            if radiance_image_path.exists() and not self.config.radiance.overwrite:
                logger.info(f"Image {radiance_image_path.name} exists - skipping.")
                continue

            logger.info(f"Converting {raw_image_path.name} to radiance")
            try:
                radiance_converter.convert_raw_file_to_radiance(raw_image_path, radiance_image_path)
            except Exception as e:
                logger.exception(f"Error converting {raw_image_path.name} to radiance.")
                continue

    @staticmethod
    def _create_rgb_geotiff(
        hyspec_paths: list[Path],
        geotransform_paths: list[Path],
        geotiff_paths: list[Path],
        geotiff_overwrite: bool,
        rgb_wl: Union[tuple[float, float, float], None],
    ) -> None:
        """Create georeferenced RGB GeoTIFF versions of hyperspectral image

        Parameters
        ----------
        hyspec_paths : list[Path]
            Paths to hyperspectral files from which to create RGB images.
            If the file does not exist, the corresponding image is skipped.
        geotransform_paths : list[Path]
            Paths to geotransform files with affine transform for images.
            If the file does not exist, the corresponding image is skipped.
        geotiff_paths : list[Path]
            Paths for output ((GeoTIFF) files
        geotiff_overwrite : bool
            Boolean indicating if existing GeoTIFF files should be overwritten.
        rgb_wl : tuple[float, float, float]
            Wavelengths (in nm) to use for red, green and blue.
        """
        # TODO: Clean up the two possible ways to generate geotiffs
        # (via ENVI header map info or via geotransform file)

        georeferencer = SimpleGeoreferencer(rgb_only=True, rgb_wl=rgb_wl)

        if not any([image_path.exists() for image_path in hyspec_paths]):
            logger.warning(f"None of the listed hyperspectral files exist")
            return
        if not any([geotrans_path.exists() for geotrans_path in geotransform_paths]):
            logger.info(f"No geotransform files exist for the hyperspectral images.")

        for hyspec_path, geotrans_path, geotiff_path in zip(
            hyspec_paths, geotransform_paths, geotiff_paths
        ):
            if geotiff_path.exists() and not geotiff_overwrite:
                logger.info(f"Image {geotiff_path.name} exists - skipping.")
                continue

            if hyspec_path.exists():
                logger.info(f"Exporting RGB GeoTIFF from {hyspec_path.name}.")

                if geotrans_path.exists():
                    logger.info(f"Using geotransform in file {geotrans_path.name}")
                    try:
                        georeferencer.georeference_hyspec_save_geotiff(
                            hyspec_path,
                            geotrans_path,
                            geotiff_path,
                        )
                    except Exception:
                        logger.exception(
                            f"Error occured while georeferencing RGB version of {hyspec_path}"
                        )

                elif header_contains_mapinfo(hyspec_path):
                    logger.info(f"Generating GeoTiff using ENVI header map info and rasterio")
                    try:
                        georeferenced_hyspec_to_rgb_geotiff(
                            hyspec_path,
                            geotiff_path,
                            rgb_wl=rgb_wl,
                        )
                    except Exception:
                        logger.exception(
                            f"Error occured while creating RGB version of {hyspec_path}"
                        )

                else:
                    logger.error("No ENVI header map info or geotransform file - skipping.")
                    continue

    def create_radiance_rgb_geotiff(self) -> None:
        """Create georeferenced RGB GeoTIFF versions of radiance"""
        logger.info("---- EXPORTING RGB GEOTIFF FOR RADIANCE ----")
        self.radiance_rgb_dir.mkdir(exist_ok=True)
        self._create_rgb_geotiff(
            hyspec_paths=self.rad_im_paths,
            geotransform_paths=self.geotransform_paths,
            geotiff_paths=self.rad_rgb_paths,
            geotiff_overwrite=self.config.radiance_rgb.overwrite,
            rgb_wl=self.config.general.rgb_wl,
        )

    def convert_raw_spectra_to_irradiance(self) -> None:
        """Convert raw spectra (DN) to irradiance (W/(m2*nm))"""
        logger.info("---- IRRADIANCE CONVERSION ----")
        self.radiance_dir.mkdir(exist_ok=True)
        irradiance_converter = IrradianceConverter(self.irradiance_calibration_file)
        for raw_spec_path, irrad_spec_path in zip(self.raw_spec_paths, self.irrad_spec_paths):
            if irrad_spec_path.exists() and not self.config.irradiance.overwrite:
                logger.info(f"Image {irrad_spec_path.name} exists - skipping.")
                continue

            if raw_spec_path is not None and raw_spec_path.exists():
                logger.info(f"Converting {raw_spec_path.name} to downwelling irradiance")
                try:
                    irradiance_converter.convert_raw_file_to_irradiance(
                        raw_spec_path, irrad_spec_path
                    )
                except Exception as e:
                    logger.exception(f"Irradiance conversion failed for {raw_spec_path.name}")
                    continue

    def calibrate_irradiance_wavelengths(self) -> None:
        """Calibrate irradiance wavelengths using Fraunhofer absorption lines"""
        logger.info("---- IRRADIANCE WAVELENGTH CALIBRATION ----")
        if not (self.radiance_dir.exists()):
            raise FileNotFoundError("Radiance folder with irradiance spectra does not exist")
        wavelength_calibrator = WavelengthCalibrator()
        irradiance_spec_paths = sorted(self.radiance_dir.glob("*.spec.hdr"))
        if irradiance_spec_paths:
            wavelength_calibrator.fit_batch(irradiance_spec_paths)
            for irradiance_spec_path in irradiance_spec_paths:
                logger.info(f"Calibrating wavelengths for {irradiance_spec_path.name}")
                try:
                    wavelength_calibrator.update_header_wavelengths(irradiance_spec_path)
                except Exception as e:
                    logger.exception(
                        f"Wavelength calibration failed for {irradiance_spec_path.name}"
                    )
                    continue

    def glint_correct_radiance_images(self) -> None:
        """Remove water surface reflections of sun and sky light"""
        logger.info("---- RADIANCE GLINT CORRECTION ----")
        self.radiance_gc_dir.mkdir(exist_ok=True)

        # Read glint correction reference information from config
        ref_im_nums = self.config.radiance_gc.reference_image_numbers
        ref_im_ranges = self.config.radiance_gc.reference_image_ranges

        if not (ref_im_nums):
            logger.error("No reference images for sun glint correction specified - aborting.")
            return

        if (ref_im_ranges is not None) and (len(ref_im_nums) != len(ref_im_ranges)):
            raise ValueError(
                "The number of reference image numbers and reference image ranges do not match."
            )

        if (
            all([rp.exists() for rp in self.rad_gc_im_paths])
            and not self.config.radiance_gc.overwrite
        ):
            logger.info("Glint corrected radiance images already exist - skipping")
            return

        # Fit glint corrector
        ref_im_paths = [self.rad_im_paths[im_num] for im_num in ref_im_nums]
        glint_corrector = HedleyGlintCorrector(
            smooth_spectra=self.config.radiance_gc.smooth_spectra,
            subtract_dark_spec=self.config.radiance_gc.subtract_dark_spec,
            set_negative_values_to_zero=self.config.radiance_gc.set_negative_values_to_zero,
        )
        logger.info(f"Fitting glint correction model based on image numbers {ref_im_nums}")
        glint_corrector.fit_to_reference_images(ref_im_paths, ref_im_ranges)

        # Run glint correction
        for rad_image, rad_gc_image in zip(self.rad_im_paths, self.rad_gc_im_paths):
            if rad_gc_image.exists() and not self.config.radiance_gc.overwrite:
                logger.info(f"Image {rad_gc_image.name} exists - skipping.")
                continue
            if rad_image.exists():
                logger.info(f"Running glint correction for {rad_image.name}")
                try:
                    glint_corrector.glint_correct_image_file(rad_image, rad_gc_image)
                except Exception as e:
                    logger.exception(f"Glint correction failed for {rad_image.name}")
                    continue

    def create_glint_corrected_radiance_rgb_geotiff(self) -> None:
        """Create georeferenced GeoTIFF versions of glint corrected radiance"""
        logger.info("---- EXPORTING RGB GEOTIFF FOR GLINT CORRECTED RADIANCE ----")
        self.radiance_gc_rgb_dir.mkdir(exist_ok=True)
        self._create_rgb_geotiff(
            hyspec_paths=self.rad_gc_im_paths,
            geotransform_paths=self.geotransform_paths,
            geotiff_paths=self.rad_gc_rgb_paths,
            geotiff_overwrite=self.config.radiance_gc_rgb.overwrite,
            rgb_wl=self.config.general.rgb_wl,
        )

    def convert_radiance_images_to_reflectance(self) -> None:
        """Convert radiance images (microflicks) to reflectance (unitless)"""
        logger.info("---- REFLECTANCE CONVERSION ----")
        self.reflectance_dir.mkdir(exist_ok=True)
        reflectance_converter = ReflectanceConverter(
            wl_min=self.config.reflectance.wl_min,
            wl_max=self.config.reflectance.wl_max,
            conv_irrad_with_gauss=self.config.reflectance.conv_irrad_with_gauss,
            smooth_spectra=self.config.reflectance.smooth_spectra,
            refl_from_mean_irrad=self.config.reflectance.refl_from_mean_irrad,
            irrad_spec_paths=self.irrad_spec_paths,
        )

        if not any([rp.exists() for rp in self.rad_im_paths]):
            raise FileNotFoundError(f"No radiance images found in {self.radiance_dir}")
        if not any([irp.exists() for irp in self.irrad_spec_paths]):
            raise FileNotFoundError(f"No irradiance spectra found in {self.radiance_dir}")

        for rad_path, irrad_path, refl_path in zip(
            self.rad_im_paths, self.irrad_spec_paths, self.refl_im_paths
        ):
            if refl_path.exists() and not self.config.reflectance.overwrite:
                logger.info(f"Image {refl_path.name} exists - skipping.")
                continue
            if rad_path.exists() and irrad_path.exists():
                logger.info(f"Converting {rad_path.name} to reflectance.")
                try:
                    reflectance_converter.convert_radiance_file_to_reflectance(
                        rad_path, irrad_path, refl_path
                    )
                except Exception as e:
                    logger.exception(
                        f"Error occured while converting {rad_path.name} to reflectance"
                    )

    def add_irradiance_to_radiance_header(self) -> None:
        """Pre-process irradiance for reflectance calc. and save to radiance header"""
        logger.info("---- WRITING IRRADIANCE TO RADIANCE HEADER ----")
        reflectance_converter = ReflectanceConverter(
            wl_min=None,  # Not used, but set to None to show that ...
            wl_max=None,  # ... original radiance wavelengths are not modified.
            conv_irrad_with_gauss=self.config.reflectance.conv_irrad_with_gauss,
            smooth_spectra=self.config.reflectance.smooth_spectra,
            refl_from_mean_irrad=self.config.reflectance.refl_from_mean_irrad,
            irrad_spec_paths=self.irrad_spec_paths,
        )

        if not any([rp.exists() for rp in self.rad_im_paths]):
            raise FileNotFoundError(f"No radiance images found in {self.radiance_dir}")
        if not any([irp.exists() for irp in self.irrad_spec_paths]):
            raise FileNotFoundError(f"No irradiance spectra found in {self.radiance_dir}")

        for rad_path, irrad_path in zip(self.rad_im_paths, self.irrad_spec_paths):
            if rad_path.exists() and irrad_path.exists():
                logger.info(f"Writing irradiance to radiance header {rad_path.name}.")
                try:
                    reflectance_converter.add_irradiance_spectrum_to_header(rad_path, irrad_path)
                except Exception as e:
                    logger.exception(
                        f"Error occured while adding irradiance spectrum to {rad_path.name}"
                    )

    def add_mapinfo_to_radiance_header(self) -> None:
        """Add ENVI mapinfo (geotransform) to radiance header"""
        logger.info("---- WRITING MAP INFO TO RADIANCE HEADER ----")
        if not any([rp.exists() for rp in self.rad_im_paths]):
            raise FileNotFoundError(f"No radiance images found in {self.radiance_dir}")
        if not any([gtp.exists() for gtp in self.geotransform_paths]):
            raise FileNotFoundError(f"No geotransform JSON files found in {self.geotransform_dir}")
        for rad_path, geotrans_path in zip(self.rad_im_paths, self.geotransform_paths):
            logger.info(f"Adding map info to {rad_path.name}")
            try:
                add_header_mapinfo(rad_path, geotrans_path)
            except Exception as e:
                logger.exception(f"Error adding map info to {rad_path.name}")
                continue

    def glint_correct_reflectance_images(self) -> None:
        """Correct for sun and sky glint in reflectance images"""
        logger.info("---- REFLECTANCE GLINT CORRECTION ----")
        self.reflectance_gc_dir.mkdir(exist_ok=True)

        # Calculate reflectance based on glint corrected radiance
        if self.config.reflectance_gc.method == "from_rad_gc":
            logger.info("Calculating glint corrected reflectance from glint corrected radiance")
            reflectance_converter = ReflectanceConverter(
                wl_min=self.config.reflectance.wl_min,
                wl_max=self.config.reflectance.wl_max,
                smooth_spectra=self.config.reflectance_gc.smooth_spectra,
            )

            if not any([rp.exists() for rp in self.rad_gc_im_paths]):
                logger.warning(
                    f"No glint corrected radiance images found in {self.radiance_gc_dir}"
                )

            for rad_gc_path, refl_gc_path in zip(self.rad_gc_im_paths, self.refl_gc_im_paths):
                if refl_gc_path.exists() and not self.config.reflectance_gc.overwrite:
                    logger.info(f"Image {refl_gc_path.name} exists - skipping.")
                    continue
                if rad_gc_path.exists():
                    logger.info(f"Calculating reflectance based on {rad_gc_path.name}.")
                    try:
                        reflectance_converter.convert_radiance_file_with_irradiance_to_reflectance(
                            rad_gc_path, refl_gc_path
                        )
                    except KeyError:
                        logger.error(f"Irradiance spectrum missing in {rad_gc_path}")
                        continue
                    except Exception as e:
                        logger.exception(
                            f"Glint corrected reflectance from rad_gc failed for {rad_gc_path.name}"
                        )
                        continue

        # Calculate glint corrected reflectance based on assumption of flat glint spectrum
        elif self.config.reflectance_gc.method == "flat_spec":
            logger.info("Using flat spectrum method for reflectance glint correction")
            glint_corrector = FlatSpecGlintCorrector(
                smooth_with_savitsky_golay=self.config.reflectance_gc.smooth_spectra
            )

            if not any([rp.exists() for rp in self.refl_im_paths]):
                raise FileNotFoundError(f"No reflectance images found in {self.reflectance_dir}")

            for refl_path, refl_gc_path in zip(self.refl_im_paths, self.refl_gc_im_paths):
                if refl_gc_path.exists() and not self.config.reflectance_gc.overwrite:
                    logger.info(f"Image {refl_gc_path.name} exists - skipping.")
                    continue
                if refl_path.exists():
                    logger.info(f"Applying glint correction to {refl_path.name}.")
                    try:
                        glint_corrector.glint_correct_image_file(refl_path, refl_gc_path)
                    except Exception as e:
                        logger.exception(f"Flat spec glint correction failed for {refl_path.name}")
                        continue
        else:
            logger.error("Unrecognized glint correction method specified in configuration.")

    def create_glint_corrected_reflectance_rgb_geotiff(self) -> None:
        """Create georeferenced GeoTIFF versions of glint corrected reflectance"""
        logger.info("---- EXPORTING RGB GEOTIFF FOR GLINT CORRECTED REFLECTANCE ----")
        self.reflectance_gc_rgb_dir.mkdir(exist_ok=True)

        self._create_rgb_geotiff(
            hyspec_paths=self.refl_gc_im_paths,
            geotransform_paths=self.geotransform_paths,
            geotiff_paths=self.refl_gc_rgb_paths,
            geotiff_overwrite=self.config.reflectance_gc.overwrite,
            rgb_wl=self.config.general.rgb_wl,
        )

    @staticmethod
    def _mosaic_geotiffs(
        geotiff_paths: list[Path],
        mosaic_path: Path,
        mosaic_overwrite: bool,
        overview_factors: Sequence[int],
        convert_to_8bit: bool = True,
    ) -> None:
        """Mosaic GeoTIFF images into single GeoTIFF with overviews

        Parameters
        ----------
        geotiff_paths : list[Path]
            List of GeoTIFFs to mosaic
        mosaic_path : Path
            Path to output GeoTIFF
        mosaic_overwrite : bool
            Whether to overwrite existing output GeoTIFF.
        overview_factors : Sequence[int]
            GeoTIFF overview factors, typically powers of 2
            Used to add "pyramid" of lower-resolution images for faster image browsing.
        convert_to_8bit : bool, optional
            If true, the mosaic is percentile stretched and converted to 8-bit.
            This decreases file size and enhances contrast, at the cost of losing the
            (physical) units of the original image.
        """
        if (mosaic_path.exists()) and (not mosaic_overwrite):
            logger.info(f"Mosaic {mosaic_path.name} already exists - skipping.")
            return

        if not any([gtp.exists() for gtp in geotiff_paths]):
            logger.warning(f"None of the listed GeoTIFFs exist.")
            return

        mosaic_geotiffs(geotiff_paths, mosaic_path)
        if convert_to_8bit:
            convert_geotiff_to_8bit(input_image_path=mosaic_path, output_image_path=mosaic_path)
        add_geotiff_overviews(mosaic_path, overview_factors)

    def mosaic_radiance_geotiffs(self) -> None:
        """Merge radiance RGB images into mosaic with overviews"""
        logger.info("---- MOSAICING RADIANCE ----")
        self.mosaic_dir.mkdir(exist_ok=True)

        self._mosaic_geotiffs(
            geotiff_paths=self.rad_rgb_paths,
            mosaic_path=self.mosaic_rad_path,
            mosaic_overwrite=self.config.mosaic.radiance_rgb.overwrite,
            overview_factors=self.config.mosaic.overview_factors,
        )

    def mosaic_radiance_gc_geotiffs(self) -> None:
        """Merge radiance_gc RGB images into mosaic with overviews"""
        logger.info("---- MOSAICING GLINT CORRECTED RADIANCE ----")
        self.mosaic_dir.mkdir(exist_ok=True)

        self._mosaic_geotiffs(
            geotiff_paths=self.rad_gc_rgb_paths,
            mosaic_path=self.mosaic_rad_gc_path,
            mosaic_overwrite=self.config.mosaic.radiance_gc_rgb.overwrite,
            overview_factors=self.config.mosaic.overview_factors,
        )

    def mosaic_reflectance_gc_geotiffs(self) -> None:
        """Merge reflectance_gc RGB images into mosaic with overviews"""
        logger.info("---- MOSAICING GLINT CORRECTED REFLECTANCE ----")
        self.mosaic_dir.mkdir(exist_ok=True)

        self._mosaic_geotiffs(
            geotiff_paths=self.refl_gc_rgb_paths,
            mosaic_path=self.mosaic_refl_gc_path,
            mosaic_overwrite=self.config.mosaic.reflectance_gc_rgb.overwrite,
            overview_factors=self.config.mosaic.overview_factors,
        )

    def delete_existing_products(
        self,
        delete_quicklook: bool = True,
        delete_radiance: bool = True,
        delete_radiance_gc: bool = True,
        delete_reflectance: bool = True,
        delete_reflectance_gc: bool = True,
        delete_geotransform: bool = True,
        delete_imudata: bool = True,
        delete_mosaics: bool = True,
    ) -> None:
        """Delete existing image products ("reset" after previous processing)

        Parameters
        ----------
        delete_quicklook : bool, optional
            If true, delete 0b_quicklook folder (if it exists)
        delete_radiance : bool, optional
            If true, delete 1a_radiance folder (if it exists)
            If data "starting point" is radiance and not raw files,
            radiance will not be deleted (delete manually if needed).
        delete_radiance_gc : bool, optional
            If true, delete 1b_radiance folder (if it exists)
        delete_reflectance : bool, optional
            If true, delete 2a_reflectance folder (if it exists)
        delete_reflectance_gc : bool, optional
            If true, delete 2b_reflectance folder (if it exists)
        delete_geotransform : bool, optional
            If true, delete geotransform folder (if it exists)
        delete_imudata : bool, optional
            If true, delete imudata folder (if it exists)
            If data "starting point" is radiance and not raw files,
            imu data will not be deleted (delete manually if needed).
        delete_mosaics : bool, optional
            If true, delete mosaics folder (if it exists)
        """
        if self.quicklook_dir.exists() and delete_quicklook:
            logger.info(f"Deleting {self.quicklook_dir}")
            shutil.rmtree(self.quicklook_dir)
        if (
            self.radiance_dir.exists()
            and delete_radiance
            and not (self.data_starting_point == "radiance")
        ):
            logger.info(f"Deleting {self.radiance_dir}")
            shutil.rmtree(self.radiance_dir)
        if self.radiance_gc_dir.exists() and delete_radiance_gc:
            logger.info(f"Deleting {self.radiance_gc_dir}")
            shutil.rmtree(self.radiance_gc_dir)
        if self.reflectance_dir.exists() and delete_reflectance:
            logger.info(f"Deleting {self.reflectance_dir}")
            shutil.rmtree(self.reflectance_dir)
        if self.reflectance_gc_dir.exists() and delete_reflectance_gc:
            logger.info(f"Deleting {self.reflectance_gc_dir}")
            shutil.rmtree(self.reflectance_gc_dir)
        if self.geotransform_dir.exists() and delete_geotransform:
            logger.info(f"Deleting {self.geotransform_dir}")
            shutil.rmtree(self.geotransform_dir)
        if (
            self.imudata_dir.exists()
            and delete_imudata
            and not (self.data_starting_point == "radiance")
        ):
            logger.info(f"Deleting {self.imudata_dir}")
            shutil.rmtree(self.imudata_dir)
        if self.mosaic_dir.exists() and delete_mosaics:
            logger.info(f"Deleting {self.mosaic_dir}")
            shutil.rmtree(self.mosaic_dir)

    def run_quicklook(self) -> None:
        """Create quicklook versions of images (percentile stretched)"""
        if self.config.quicklook.create:
            try:
                self.create_quicklook_images()
            except Exception:
                logger.exception("Error while creating quicklook images")

    def run_raw_data_processing(self) -> None:
        """Run all data processing steps based on raw data"""

        if self.config.imu_data.create:
            try:
                self.parse_and_save_imu_data()
            except Exception:
                logger.exception("Error while parsing and saving IMU data")

        if self.config.radiance.create:
            try:
                self.convert_raw_images_to_radiance()
            except Exception:
                logger.exception("Error while converting raw images to radiance")

        if self.config.irradiance.create:
            try:
                self.convert_raw_spectra_to_irradiance()
            except Exception:
                logger.exception("Error while converting raw spectra to irradiance")

            try:
                self.calibrate_irradiance_wavelengths()
            except Exception:
                logger.exception("Error while calibrating irradiance wavelengths")

    def run_secondary_processing(self) -> None:
        if self.config.geotransform.create:
            try:
                self.create_and_save_geotransform()
            except Exception:
                logger.exception("Error while parsing and saving IMU data")

        if self.config.radiance.add_irradiance_to_header:
            try:
                self.add_irradiance_to_radiance_header()
            except Exception:
                logger.exception("Error while adding irradiance to radiance header")

        if self.config.radiance.add_envi_mapinfo_to_header:
            try:
                self.add_mapinfo_to_radiance_header()
            except Exception:
                logger.exception("Error while adding map info to radiance header")

        if self.config.radiance_rgb.create:
            try:
                self.create_radiance_rgb_geotiff()
            except Exception:
                logger.exception("Error while creating RGB GeoTOFFs from radiance")

        if self.config.reflectance.create:
            try:
                self.convert_radiance_images_to_reflectance()
            except FileNotFoundError:
                logger.exception(
                    "Missing input radiance / irradiance files, skipping reflectance conversion."
                )
            except Exception:
                logger.exception("Error while converting from radiance to reflectance")

    def run_glint_correction(self) -> None:
        """Run glint correction using parameters defined in YAML file

        The processing steps include:
            - Fitting a glint correction model to radiance images
            - Running glint correction and saving glint corrected images
            - Creating RGB GeoTiff versions of glint corrected images

        See massipipe.config.Config and template YAML file for all options.

        """

        if self.config.radiance_gc.create:
            try:
                self.glint_correct_radiance_images()
            except Exception:
                logger.exception("Error while glint correcting radiance images")

        if self.config.radiance_gc_rgb.create:
            try:
                self.create_glint_corrected_radiance_rgb_geotiff()
            except Exception:
                logger.exception("Error while creating RGB GeoTIFFs from glint corrected radiance")

        if self.config.reflectance_gc.create:
            try:
                self.glint_correct_reflectance_images()
            except Exception:
                logger.exception("Error while glint correcting radiance images")

        if self.config.reflectance_gc_rgb.create:
            try:
                self.create_glint_corrected_reflectance_rgb_geotiff()
            except Exception:
                logger.exception("Error while creating RGB GeoTIFFs from glint corrected radiance")

    def run_mosaics(self) -> None:
        """Run all mosaicing operations"""

        if self.config.mosaic.radiance_rgb.create:
            try:
                self.mosaic_radiance_geotiffs()
            except Exception:
                logger.exception(f"Error occured while mosaicing radiance", exc_info=True)

        if self.config.mosaic.radiance_gc_rgb.create:
            try:
                self.mosaic_radiance_gc_geotiffs()
            except Exception:
                logger.error(f"Error occured while mosaicing glint corrected radiance")

        if self.config.mosaic.reflectance_gc_rgb.create:
            try:
                self.mosaic_reflectance_gc_geotiffs()
            except Exception:
                logger.exception(f"Error occured while mosaicing glint corrected reflectance")

    def run(self) -> None:
        """Run all processing steps"""
        self.run_quicklook()
        if self.data_starting_point == "raw":
            self.run_raw_data_processing()
        self.run_secondary_processing()
        self.run_glint_correction()
        self.run_mosaics()

    def export(self) -> None:
        """Export dataset to ZIP file for archival / publishing"""
        # Copy "best" mosaic to separate directory
        if self.config.mosaic.visualization_mosaic == "radiance":
            copy_visualization_mosaic(self.mosaic_rad_path, self.mosaic_visualization_dir)
        else:
            copy_visualization_mosaic(self.mosaic_rad_gc_path, self.mosaic_visualization_dir)

        # Package selected processed data as ZIP file
        zip_file_path = export_dataset_zip(
            self.dataset_dir,
            self.quicklook_dir,
            self.radiance_dir,
            self.imudata_dir,
            self.mosaic_visualization_dir,
            self.config_file_path,
        )
        logger.info(f"Dataset exported to {zip_file_path}")

__init__(dataset_dir, config_file_name='config.seabee.yaml')

Parameters:

Name Type Description Default
dataset_dir Union[Path, str]

Path to the dataset directory containing required subfolders. The required subfolders are either "0_raw" and "calibration" (for processing from raw data), or "1a_radiance" and "imudata" (for processing from radiance data).

required
config_file_name str

Name of the YAML configuration file. A template is generated if the file does not exist.

'config.seabee.yaml'

Raises:

Type Description
FileNotFoundError

If required folders (e.g. "0_raw" or "calibration") are missing.

Source code in massipipe/pipeline.py
def __init__(
    self,
    dataset_dir: Union[Path, str],
    config_file_name: str = "config.seabee.yaml",
) -> None:
    """Initialize the pipeline for dataset processing.

    Parameters
    ----------
    dataset_dir : Union[Path, str]
        Path to the dataset directory containing required subfolders. The required
        subfolders are either "0_raw" and "calibration" (for processing from raw
        data), or "1a_radiance" and "imudata" (for processing from radiance data).
    config_file_name : str, optional
        Name of the YAML configuration file. A template is generated if the file
        does not exist.

    Raises
    ------
    FileNotFoundError
        If required folders (e.g. "0_raw" or "calibration") are missing.
    """

    # Set dataset directory
    self.dataset_dir = Path(dataset_dir)

    # Define dataset folder structure
    self.dataset_base_name = self.dataset_dir.name
    self.raw_dir = self.dataset_dir / "0_raw"
    self.quicklook_dir = self.dataset_dir / "quicklook"
    self.radiance_dir = self.dataset_dir / "1a_radiance"
    self.radiance_rgb_dir = self.radiance_dir / "rgb"
    self.radiance_gc_dir = self.dataset_dir / "1b_radiance_gc"
    self.radiance_gc_rgb_dir = self.radiance_gc_dir / "rgb"
    self.reflectance_dir = self.dataset_dir / "2a_reflectance"
    self.reflectance_gc_dir = self.dataset_dir / "2b_reflectance_gc"
    self.reflectance_gc_rgb_dir = self.reflectance_gc_dir / "rgb"
    self.imudata_dir = self.dataset_dir / "imudata"
    self.geotransform_dir = self.dataset_dir / "geotransform"
    self.mosaic_dir = self.dataset_dir / "mosaics"
    self.mosaic_visualization_dir = self.dataset_dir / "orthomosaic"
    self.calibration_dir = self.dataset_dir / "calibration"
    self.logs_dir = self.dataset_dir / "logs"

    # Configure logging
    self._configure_file_logging()

    # Read config file
    config_file_path = self.dataset_dir / config_file_name
    self.config_file_path = config_file_path
    if not self.config_file_path.exists():
        logger.info(f"No config file found - exporting template file {config_file_name}")
        export_template_yaml(self.config_file_path)
    self.load_config_from_file()  # Reads config from file into self.config

    # Check if data source is raw data or radiance
    self.data_starting_point = self._check_data_starting_point()

    if self.data_starting_point == "raw":
        # Get calibration file paths
        if not self.calibration_dir.exists():
            raise FileNotFoundError(f'Folder "calibration" not found in {self.dataset_dir}')
        self.radiance_calibration_file = self._get_radiance_calibration_path()
        self.irradiance_calibration_file = self._get_irradiance_calibration_path()

        # Search for raw files, sort and validate
        self.raw_image_paths = list(self.raw_dir.rglob("*.bil.hdr"))
        self.raw_image_paths = sorted(self.raw_image_paths, key=self._get_image_number)
        times_paths, lcf_paths = self._validate_raw_files()
        self.times_paths = times_paths
        self.lcf_paths = lcf_paths

        # Search for raw irradiance spectrum files (not always present)
        self.raw_spec_paths = self._get_raw_spectrum_paths()

    # Create "base" file names numbered from 0
    self.base_file_names = self._create_base_file_names()

    # Create lists of processed file paths
    proc_file_paths = self._create_processed_file_paths()
    self.ql_im_paths = proc_file_paths.quicklook
    self.rad_im_paths = proc_file_paths.radiance
    self.rad_rgb_paths = proc_file_paths.radiance_rgb
    self.rad_gc_im_paths = proc_file_paths.radiance_gc
    self.rad_gc_rgb_paths = proc_file_paths.radiance_gc_rgb
    self.irrad_spec_paths = proc_file_paths.irradiance
    self.imu_data_paths = proc_file_paths.imudata
    self.geotransform_paths = proc_file_paths.geotransform
    self.refl_im_paths = proc_file_paths.reflectance
    self.refl_gc_im_paths = proc_file_paths.reflectance_gc
    self.refl_gc_rgb_paths = proc_file_paths.reflectance_gc_rgb

    # Create mosaic file paths
    self.mosaic_rad_path = self.mosaic_dir / (self.dataset_base_name + "_rad_rgb.tiff")
    self.mosaic_rad_gc_path = self.mosaic_dir / (self.dataset_base_name + "_rad_gc_rgb.tiff")
    self.mosaic_refl_gc_path = self.mosaic_dir / (self.dataset_base_name + "_refl_gc_rgb.tiff")

load_config_from_file()

Load or reload configuration from a YAML file.

Reads the configuration from self.config_file_path, validates it using Pydantic, and assigns the loaded options to self.config. Logs warnings on validation errors.

Source code in massipipe/pipeline.py
def load_config_from_file(self) -> None:
    """Load or reload configuration from a YAML file.

    Reads the configuration from `self.config_file_path`, validates it using Pydantic,
    and assigns the loaded options to `self.config`. Logs warnings on validation errors.
    """
    try:
        yaml_config_dict = read_config(self.config_file_path)
    except IOError:
        logger.exception(f"Error parsing config file {self.config_file_path}")
        raise

    try:
        full_config = Config(**yaml_config_dict)
    except ValidationError as e:
        logger.warning(f"Validation error while processing {self.config_file_path}")
        logger.warning(str(e))
        logger.warning(f"No configuration loaded for {self.dataset_dir}.")
        return
    self.config = full_config.massipipe_options
    logger.info(f"Pipeline configured with \n{pprint.pformat(full_config.model_dump())}")

_configure_file_logging()

Configure file logging for pipeline execution.

Creates a logs directory if needed and initializes a file handler with a timestamped log file.

Source code in massipipe/pipeline.py
def _configure_file_logging(self) -> None:
    """Configure file logging for pipeline execution.

    Creates a logs directory if needed and initializes a file handler with a
    timestamped log file.
    """
    # Create log file path
    self.logs_dir.mkdir(exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    log_file_name = f"{timestamp}_{self.dataset_base_name}.log"
    log_path = self.logs_dir / log_file_name

    # Add file handler to module-level logger
    file_handler = logging.FileHandler(log_path)
    formatter = logging.Formatter(
        fmt="%(asctime)s %(levelname)s: %(message)s", datefmt="%H:%M:%S"
    )
    file_handler.setFormatter(formatter)
    file_handler.setLevel(logging.INFO)
    logger.addHandler(file_handler)
    logger.info("-" * 64)
    logger.info(f"File logging for {self.dataset_base_name} initialized.")

_check_data_starting_point()

Determine if processing should start from raw or radiance data.

Returns:

Type Description
str

"raw" if raw data exists and "radiance" if radiance data exists.

Raises:

Type Description
FileNotFoundError

If neither raw nor radiance data is found in the dataset directory.

Source code in massipipe/pipeline.py
def _check_data_starting_point(self) -> str:
    """Determine if processing should start from raw or radiance data.

    Returns
    -------
    str
        "raw" if raw data exists and "radiance" if radiance data exists.

    Raises
    ------
    FileNotFoundError
        If neither raw nor radiance data is found in the dataset directory.
    """
    if self.raw_dir.exists():
        logger.info(f"Found directory {self.raw_dir} - processing from raw files.")
        data_starting_point = "raw"
    else:
        if self.radiance_dir.exists():
            logger.info(
                f"No raw files found, but radiance directory {self.radiance_dir.name} found"
                + " - processing based on radiance data."
            )
            data_starting_point = "radiance"
        else:
            raise FileNotFoundError(f"Found neither raw or radiance files in dataset dir.")
    return data_starting_point

_validate_raw_files()

Check that all expected raw files exist

Returns:

times_paths, lcf_paths: list[Path] Lists of paths to .times and .lcf files for every valid raw file

Source code in massipipe/pipeline.py
def _validate_raw_files(self) -> tuple[list[Path], list[Path]]:
    """Check that all expected raw files exist

    Returns:
    --------
    times_paths, lcf_paths: list[Path]
        Lists of paths to *.times and *.lcf files for every valid raw file
    """
    times_paths = []
    lcf_paths = []
    for raw_image_path in list(self.raw_image_paths):  # Use list() to copy
        file_base_name = raw_image_path.name.split(".")[0]
        binary_im_path = raw_image_path.parent / (file_base_name + ".bil")
        times_path = raw_image_path.parent / (file_base_name + ".bil.times")
        lcf_path = raw_image_path.parent / (file_base_name + ".lcf")
        if (
            not (binary_im_path.exists())
            or not (times_path.exists())
            or not (lcf_path.exists())
        ):
            logger.warning(f"Set of raw files for image {raw_image_path} is incomplete.")
            self.raw_image_paths.remove(raw_image_path)
        else:
            times_paths.append(times_path)
            lcf_paths.append(lcf_path)
    return times_paths, lcf_paths

_get_image_number(raw_image_path) staticmethod

Get image number from raw image

Parameters:

Name Type Description Default
raw_image_path Union[Path, str]

Path to raw image (ENVI header file)

required

Returns:

Type Description
int

Image number

Examples:

>>> Pipeline._get_image_number("ExampleLocation_Pika_L_5.bil.hdr")
5
Notes

Raw files are numbered sequentially, but the numbers are not zero-padded. This can lead to incorrect sorting of the images, e.g. ['im_1','im_2','im_10'] (simplified names for example) are sorted ['im_1','im_10','im_2']. By extracting the numbers from filenames of raw files and sorting explicitly on these (as integers), correct ordering can be achieved.

Source code in massipipe/pipeline.py
@staticmethod
def _get_image_number(raw_image_path: Union[Path, str]) -> int:
    """Get image number from raw image

    Parameters
    ----------
    raw_image_path : Union[Path, str]
        Path to raw image (ENVI header file)

    Returns
    -------
    int
        Image number

    Examples
    --------
    >>> Pipeline._get_image_number("ExampleLocation_Pika_L_5.bil.hdr")
    5

    Notes
    -----
    Raw files are numbered sequentially, but the numbers are not
    zero-padded. This can lead to incorrect sorting of the images, e.g.
    ['im_1','im_2','im_10'] (simplified names for example) are sorted
    ['im_1','im_10','im_2']. By extracting the numbers from filenames
    of raw files and sorting explicitly on these (as integers),
    correct ordering can be achieved.
    """
    raw_image_path = Path(raw_image_path)
    image_file_stem = raw_image_path.name.split(".")[0]
    image_number = image_file_stem.split("_")[-1]
    return int(image_number)

_create_base_file_names()

Create numbered base names for processed files

Source code in massipipe/pipeline.py
def _create_base_file_names(self) -> list[str]:
    """Create numbered base names for processed files"""
    if self.data_starting_point == "raw":
        base_file_names = [
            f"{self.dataset_base_name}_{i:03d}" for i in range(len(self.raw_image_paths))
        ]
    else:  # Radiance as data starting point
        base_file_names = sorted(
            [
                file.name.split("_radiance")[0]
                for file in self.radiance_dir.glob("*_radiance*.hdr")
            ]
        )
    return base_file_names

_create_processed_file_paths()

Define default subfolders for processed files

Source code in massipipe/pipeline.py
def _create_processed_file_paths(self) -> ProcessedFilePaths:
    """Define default subfolders for processed files"""
    proc_file_paths = ProcessedFilePaths()

    for base_file_name in self.base_file_names:
        # Quicklook
        ql_path = self.quicklook_dir / (base_file_name + "_quicklook.png")
        proc_file_paths.quicklook.append(ql_path)

        # Radiance
        rad_path = self.radiance_dir / (base_file_name + "_radiance.bip.hdr")
        proc_file_paths.radiance.append(rad_path)

        # Radiance, RGB version
        rad_rgb_path = self.radiance_rgb_dir / (base_file_name + "_radiance_rgb.tiff")
        proc_file_paths.radiance_rgb.append(rad_rgb_path)

        # Radiance, glint corrected
        rad_gc_path = self.radiance_gc_dir / (base_file_name + "_radiance_gc.bip.hdr")
        proc_file_paths.radiance_gc.append(rad_gc_path)

        # Radiance, glint corrected, RGB version
        rad_gc_rgb_path = self.radiance_gc_rgb_dir / (base_file_name + "_radiance_gc_rgb.tiff")
        proc_file_paths.radiance_gc_rgb.append(rad_gc_rgb_path)

        # Irradiance
        irs_path = self.radiance_dir / (base_file_name + "_irradiance.spec.hdr")
        proc_file_paths.irradiance.append(irs_path)

        # IMU data
        imu_path = self.imudata_dir / (base_file_name + "_imudata.json")
        proc_file_paths.imudata.append(imu_path)

        # Geotransform
        gt_path = self.geotransform_dir / (base_file_name + "_geotransform.json")
        proc_file_paths.geotransform.append(gt_path)

        # Reflectance
        refl_path = self.reflectance_dir / (base_file_name + "_reflectance.bip.hdr")
        proc_file_paths.reflectance.append(refl_path)

        # Reflectance, glint corrected
        rgc_path = self.reflectance_gc_dir / (base_file_name + "_reflectance_gc.bip.hdr")
        proc_file_paths.reflectance_gc.append(rgc_path)

        # Reflectance, glint corrected, RGB version
        rgc_rgb_path = self.reflectance_gc_rgb_dir / (
            base_file_name + "_reflectance_gc_rgb.tiff"
        )
        proc_file_paths.reflectance_gc_rgb.append(rgc_rgb_path)

    return proc_file_paths

_get_raw_spectrum_paths()

Search for raw files matching Resonon default naming

Source code in massipipe/pipeline.py
def _get_raw_spectrum_paths(self) -> list[Path]:
    """Search for raw files matching Resonon default naming"""
    spec_paths = []
    for raw_image_path in self.raw_image_paths:
        spec_base_name = raw_image_path.name.split("_Pika_L")[0]
        image_number = self._get_image_number(raw_image_path)
        spec_binary = (
            raw_image_path.parent / f"{spec_base_name}_downwelling_{image_number}_pre.spec"
        )
        spec_header = raw_image_path.parent / (spec_binary.name + ".hdr")
        if spec_binary.exists() and spec_header.exists():
            spec_paths.append(spec_header)
        else:
            spec_paths.append(None)
    return spec_paths

_get_radiance_calibration_path()

Search for radiance calibration file (*.icp)

Source code in massipipe/pipeline.py
def _get_radiance_calibration_path(self) -> Path:
    """Search for radiance calibration file (*.icp)"""
    icp_files = list(self.calibration_dir.glob("*.icp"))
    if len(icp_files) == 1:
        return icp_files[0]
    elif len(icp_files) == 0:
        raise FileNotFoundError(
            f"No radiance calibration file (*.icp) found in {self.calibration_dir}"
        )
    else:
        raise ValueError(
            "More than one radiance calibration file (*.icp) found in "
            + f"{self.calibration_dir}"
        )

_get_irradiance_calibration_path()

Search for irradiance calibration file (*.dcp)

Source code in massipipe/pipeline.py
def _get_irradiance_calibration_path(self) -> Path:
    """Search for irradiance calibration file (*.dcp)"""
    dcp_files = list(self.calibration_dir.glob("*.dcp"))
    if len(dcp_files) == 1:
        return dcp_files[0]
    elif len(dcp_files) == 0:
        raise FileNotFoundError(
            "No irradiance calibration file (*.dcp) found in " + f"{self.calibration_dir}"
        )
    else:
        raise ValueError(
            "More than one irradiance calibration file (*.dcp) found in "
            + f"{self.calibration_dir}"
        )

create_quicklook_images()

Create quicklook versions of hyperspectral images

Source code in massipipe/pipeline.py
def create_quicklook_images(self) -> None:
    """Create quicklook versions of hyperspectral images"""
    logger.info("---- QUICKLOOK IMAGE GENERATION ----")
    self.quicklook_dir.mkdir(exist_ok=True)
    quicklook_processor = QuickLookProcessor(
        rgb_wl=self.config.general.rgb_wl, percentiles=self.config.quicklook.percentiles
    )

    # Determine whether raw or radiance images are used for quicklook
    if self.raw_dir.exists():
        hyspec_image_paths = self.raw_image_paths
        logger.info(f"Creating quicklook images from raw images")
    else:
        hyspec_image_paths = self.rad_im_paths
        logger.info(f"Creating quicklook images from radiance images")

    for hyspec_image_path, quicklook_image_path in zip(hyspec_image_paths, self.ql_im_paths):
        if quicklook_image_path.exists() and not self.config.quicklook.overwrite:
            logger.info(f"Image {quicklook_image_path.name} exists - skipping.")
            continue

        logger.info(f"Creating quicklook version of {hyspec_image_path.name}")
        try:
            quicklook_processor.create_quicklook_image_file(
                hyspec_image_path, quicklook_image_path
            )
        except Exception as e:
            logger.exception(f"Error creating quicklook for {hyspec_image_path.name}")
            continue

parse_and_save_imu_data()

Parse .lcf and .times files with IMU data and save as JSON

Source code in massipipe/pipeline.py
def parse_and_save_imu_data(self) -> None:
    """Parse *.lcf and *.times files with IMU data and save as JSON"""
    logger.info("---- IMU DATA PROCESSING ----")
    self.imudata_dir.mkdir(exist_ok=True)
    imu_data_parser = ImuDataParser()
    for lcf_path, times_path, imu_data_path in zip(
        self.lcf_paths, self.times_paths, self.imu_data_paths
    ):
        if imu_data_path.exists() and not self.config.imu_data.overwrite:
            logger.info(f"Image {imu_data_path.name} exists - skipping.")
            continue

        logger.info(f"Processing IMU data from {lcf_path.name}")
        try:
            imu_data_parser.read_and_save_imu_data(lcf_path, times_path, imu_data_path)
        except Exception as e:
            logger.exception(f"Error processing IMU data from {lcf_path.name}")
            continue

convert_raw_images_to_radiance()

Convert raw hyperspectral images (DN) to radiance (microflicks)

Source code in massipipe/pipeline.py
def convert_raw_images_to_radiance(self) -> None:
    """Convert raw hyperspectral images (DN) to radiance (microflicks)"""
    logger.info("---- RADIANCE CONVERSION ----")
    self.radiance_dir.mkdir(exist_ok=True)
    radiance_converter = RadianceConverter(
        self.radiance_calibration_file,
        set_saturated_pixels_to_zero=self.config.radiance.set_saturated_pixels_to_zero,
    )
    for raw_image_path, radiance_image_path in zip(self.raw_image_paths, self.rad_im_paths):
        if radiance_image_path.exists() and not self.config.radiance.overwrite:
            logger.info(f"Image {radiance_image_path.name} exists - skipping.")
            continue

        logger.info(f"Converting {raw_image_path.name} to radiance")
        try:
            radiance_converter.convert_raw_file_to_radiance(raw_image_path, radiance_image_path)
        except Exception as e:
            logger.exception(f"Error converting {raw_image_path.name} to radiance.")
            continue

_create_rgb_geotiff(hyspec_paths, geotransform_paths, geotiff_paths, geotiff_overwrite, rgb_wl) staticmethod

Create georeferenced RGB GeoTIFF versions of hyperspectral image

Parameters:

Name Type Description Default
hyspec_paths list[Path]

Paths to hyperspectral files from which to create RGB images. If the file does not exist, the corresponding image is skipped.

required
geotransform_paths list[Path]

Paths to geotransform files with affine transform for images. If the file does not exist, the corresponding image is skipped.

required
geotiff_paths list[Path]

Paths for output ((GeoTIFF) files

required
geotiff_overwrite bool

Boolean indicating if existing GeoTIFF files should be overwritten.

required
rgb_wl tuple[float, float, float]

Wavelengths (in nm) to use for red, green and blue.

required
Source code in massipipe/pipeline.py
@staticmethod
def _create_rgb_geotiff(
    hyspec_paths: list[Path],
    geotransform_paths: list[Path],
    geotiff_paths: list[Path],
    geotiff_overwrite: bool,
    rgb_wl: Union[tuple[float, float, float], None],
) -> None:
    """Create georeferenced RGB GeoTIFF versions of hyperspectral image

    Parameters
    ----------
    hyspec_paths : list[Path]
        Paths to hyperspectral files from which to create RGB images.
        If the file does not exist, the corresponding image is skipped.
    geotransform_paths : list[Path]
        Paths to geotransform files with affine transform for images.
        If the file does not exist, the corresponding image is skipped.
    geotiff_paths : list[Path]
        Paths for output ((GeoTIFF) files
    geotiff_overwrite : bool
        Boolean indicating if existing GeoTIFF files should be overwritten.
    rgb_wl : tuple[float, float, float]
        Wavelengths (in nm) to use for red, green and blue.
    """
    # TODO: Clean up the two possible ways to generate geotiffs
    # (via ENVI header map info or via geotransform file)

    georeferencer = SimpleGeoreferencer(rgb_only=True, rgb_wl=rgb_wl)

    if not any([image_path.exists() for image_path in hyspec_paths]):
        logger.warning(f"None of the listed hyperspectral files exist")
        return
    if not any([geotrans_path.exists() for geotrans_path in geotransform_paths]):
        logger.info(f"No geotransform files exist for the hyperspectral images.")

    for hyspec_path, geotrans_path, geotiff_path in zip(
        hyspec_paths, geotransform_paths, geotiff_paths
    ):
        if geotiff_path.exists() and not geotiff_overwrite:
            logger.info(f"Image {geotiff_path.name} exists - skipping.")
            continue

        if hyspec_path.exists():
            logger.info(f"Exporting RGB GeoTIFF from {hyspec_path.name}.")

            if geotrans_path.exists():
                logger.info(f"Using geotransform in file {geotrans_path.name}")
                try:
                    georeferencer.georeference_hyspec_save_geotiff(
                        hyspec_path,
                        geotrans_path,
                        geotiff_path,
                    )
                except Exception:
                    logger.exception(
                        f"Error occured while georeferencing RGB version of {hyspec_path}"
                    )

            elif header_contains_mapinfo(hyspec_path):
                logger.info(f"Generating GeoTiff using ENVI header map info and rasterio")
                try:
                    georeferenced_hyspec_to_rgb_geotiff(
                        hyspec_path,
                        geotiff_path,
                        rgb_wl=rgb_wl,
                    )
                except Exception:
                    logger.exception(
                        f"Error occured while creating RGB version of {hyspec_path}"
                    )

            else:
                logger.error("No ENVI header map info or geotransform file - skipping.")
                continue

create_radiance_rgb_geotiff()

Create georeferenced RGB GeoTIFF versions of radiance

Source code in massipipe/pipeline.py
def create_radiance_rgb_geotiff(self) -> None:
    """Create georeferenced RGB GeoTIFF versions of radiance"""
    logger.info("---- EXPORTING RGB GEOTIFF FOR RADIANCE ----")
    self.radiance_rgb_dir.mkdir(exist_ok=True)
    self._create_rgb_geotiff(
        hyspec_paths=self.rad_im_paths,
        geotransform_paths=self.geotransform_paths,
        geotiff_paths=self.rad_rgb_paths,
        geotiff_overwrite=self.config.radiance_rgb.overwrite,
        rgb_wl=self.config.general.rgb_wl,
    )

convert_raw_spectra_to_irradiance()

Convert raw spectra (DN) to irradiance (W/(m2*nm))

Source code in massipipe/pipeline.py
def convert_raw_spectra_to_irradiance(self) -> None:
    """Convert raw spectra (DN) to irradiance (W/(m2*nm))"""
    logger.info("---- IRRADIANCE CONVERSION ----")
    self.radiance_dir.mkdir(exist_ok=True)
    irradiance_converter = IrradianceConverter(self.irradiance_calibration_file)
    for raw_spec_path, irrad_spec_path in zip(self.raw_spec_paths, self.irrad_spec_paths):
        if irrad_spec_path.exists() and not self.config.irradiance.overwrite:
            logger.info(f"Image {irrad_spec_path.name} exists - skipping.")
            continue

        if raw_spec_path is not None and raw_spec_path.exists():
            logger.info(f"Converting {raw_spec_path.name} to downwelling irradiance")
            try:
                irradiance_converter.convert_raw_file_to_irradiance(
                    raw_spec_path, irrad_spec_path
                )
            except Exception as e:
                logger.exception(f"Irradiance conversion failed for {raw_spec_path.name}")
                continue

calibrate_irradiance_wavelengths()

Calibrate irradiance wavelengths using Fraunhofer absorption lines

Source code in massipipe/pipeline.py
def calibrate_irradiance_wavelengths(self) -> None:
    """Calibrate irradiance wavelengths using Fraunhofer absorption lines"""
    logger.info("---- IRRADIANCE WAVELENGTH CALIBRATION ----")
    if not (self.radiance_dir.exists()):
        raise FileNotFoundError("Radiance folder with irradiance spectra does not exist")
    wavelength_calibrator = WavelengthCalibrator()
    irradiance_spec_paths = sorted(self.radiance_dir.glob("*.spec.hdr"))
    if irradiance_spec_paths:
        wavelength_calibrator.fit_batch(irradiance_spec_paths)
        for irradiance_spec_path in irradiance_spec_paths:
            logger.info(f"Calibrating wavelengths for {irradiance_spec_path.name}")
            try:
                wavelength_calibrator.update_header_wavelengths(irradiance_spec_path)
            except Exception as e:
                logger.exception(
                    f"Wavelength calibration failed for {irradiance_spec_path.name}"
                )
                continue

glint_correct_radiance_images()

Remove water surface reflections of sun and sky light

Source code in massipipe/pipeline.py
def glint_correct_radiance_images(self) -> None:
    """Remove water surface reflections of sun and sky light"""
    logger.info("---- RADIANCE GLINT CORRECTION ----")
    self.radiance_gc_dir.mkdir(exist_ok=True)

    # Read glint correction reference information from config
    ref_im_nums = self.config.radiance_gc.reference_image_numbers
    ref_im_ranges = self.config.radiance_gc.reference_image_ranges

    if not (ref_im_nums):
        logger.error("No reference images for sun glint correction specified - aborting.")
        return

    if (ref_im_ranges is not None) and (len(ref_im_nums) != len(ref_im_ranges)):
        raise ValueError(
            "The number of reference image numbers and reference image ranges do not match."
        )

    if (
        all([rp.exists() for rp in self.rad_gc_im_paths])
        and not self.config.radiance_gc.overwrite
    ):
        logger.info("Glint corrected radiance images already exist - skipping")
        return

    # Fit glint corrector
    ref_im_paths = [self.rad_im_paths[im_num] for im_num in ref_im_nums]
    glint_corrector = HedleyGlintCorrector(
        smooth_spectra=self.config.radiance_gc.smooth_spectra,
        subtract_dark_spec=self.config.radiance_gc.subtract_dark_spec,
        set_negative_values_to_zero=self.config.radiance_gc.set_negative_values_to_zero,
    )
    logger.info(f"Fitting glint correction model based on image numbers {ref_im_nums}")
    glint_corrector.fit_to_reference_images(ref_im_paths, ref_im_ranges)

    # Run glint correction
    for rad_image, rad_gc_image in zip(self.rad_im_paths, self.rad_gc_im_paths):
        if rad_gc_image.exists() and not self.config.radiance_gc.overwrite:
            logger.info(f"Image {rad_gc_image.name} exists - skipping.")
            continue
        if rad_image.exists():
            logger.info(f"Running glint correction for {rad_image.name}")
            try:
                glint_corrector.glint_correct_image_file(rad_image, rad_gc_image)
            except Exception as e:
                logger.exception(f"Glint correction failed for {rad_image.name}")
                continue

create_glint_corrected_radiance_rgb_geotiff()

Create georeferenced GeoTIFF versions of glint corrected radiance

Source code in massipipe/pipeline.py
def create_glint_corrected_radiance_rgb_geotiff(self) -> None:
    """Create georeferenced GeoTIFF versions of glint corrected radiance"""
    logger.info("---- EXPORTING RGB GEOTIFF FOR GLINT CORRECTED RADIANCE ----")
    self.radiance_gc_rgb_dir.mkdir(exist_ok=True)
    self._create_rgb_geotiff(
        hyspec_paths=self.rad_gc_im_paths,
        geotransform_paths=self.geotransform_paths,
        geotiff_paths=self.rad_gc_rgb_paths,
        geotiff_overwrite=self.config.radiance_gc_rgb.overwrite,
        rgb_wl=self.config.general.rgb_wl,
    )

convert_radiance_images_to_reflectance()

Convert radiance images (microflicks) to reflectance (unitless)

Source code in massipipe/pipeline.py
def convert_radiance_images_to_reflectance(self) -> None:
    """Convert radiance images (microflicks) to reflectance (unitless)"""
    logger.info("---- REFLECTANCE CONVERSION ----")
    self.reflectance_dir.mkdir(exist_ok=True)
    reflectance_converter = ReflectanceConverter(
        wl_min=self.config.reflectance.wl_min,
        wl_max=self.config.reflectance.wl_max,
        conv_irrad_with_gauss=self.config.reflectance.conv_irrad_with_gauss,
        smooth_spectra=self.config.reflectance.smooth_spectra,
        refl_from_mean_irrad=self.config.reflectance.refl_from_mean_irrad,
        irrad_spec_paths=self.irrad_spec_paths,
    )

    if not any([rp.exists() for rp in self.rad_im_paths]):
        raise FileNotFoundError(f"No radiance images found in {self.radiance_dir}")
    if not any([irp.exists() for irp in self.irrad_spec_paths]):
        raise FileNotFoundError(f"No irradiance spectra found in {self.radiance_dir}")

    for rad_path, irrad_path, refl_path in zip(
        self.rad_im_paths, self.irrad_spec_paths, self.refl_im_paths
    ):
        if refl_path.exists() and not self.config.reflectance.overwrite:
            logger.info(f"Image {refl_path.name} exists - skipping.")
            continue
        if rad_path.exists() and irrad_path.exists():
            logger.info(f"Converting {rad_path.name} to reflectance.")
            try:
                reflectance_converter.convert_radiance_file_to_reflectance(
                    rad_path, irrad_path, refl_path
                )
            except Exception as e:
                logger.exception(
                    f"Error occured while converting {rad_path.name} to reflectance"
                )

add_irradiance_to_radiance_header()

Pre-process irradiance for reflectance calc. and save to radiance header

Source code in massipipe/pipeline.py
def add_irradiance_to_radiance_header(self) -> None:
    """Pre-process irradiance for reflectance calc. and save to radiance header"""
    logger.info("---- WRITING IRRADIANCE TO RADIANCE HEADER ----")
    reflectance_converter = ReflectanceConverter(
        wl_min=None,  # Not used, but set to None to show that ...
        wl_max=None,  # ... original radiance wavelengths are not modified.
        conv_irrad_with_gauss=self.config.reflectance.conv_irrad_with_gauss,
        smooth_spectra=self.config.reflectance.smooth_spectra,
        refl_from_mean_irrad=self.config.reflectance.refl_from_mean_irrad,
        irrad_spec_paths=self.irrad_spec_paths,
    )

    if not any([rp.exists() for rp in self.rad_im_paths]):
        raise FileNotFoundError(f"No radiance images found in {self.radiance_dir}")
    if not any([irp.exists() for irp in self.irrad_spec_paths]):
        raise FileNotFoundError(f"No irradiance spectra found in {self.radiance_dir}")

    for rad_path, irrad_path in zip(self.rad_im_paths, self.irrad_spec_paths):
        if rad_path.exists() and irrad_path.exists():
            logger.info(f"Writing irradiance to radiance header {rad_path.name}.")
            try:
                reflectance_converter.add_irradiance_spectrum_to_header(rad_path, irrad_path)
            except Exception as e:
                logger.exception(
                    f"Error occured while adding irradiance spectrum to {rad_path.name}"
                )

add_mapinfo_to_radiance_header()

Add ENVI mapinfo (geotransform) to radiance header

Source code in massipipe/pipeline.py
def add_mapinfo_to_radiance_header(self) -> None:
    """Add ENVI mapinfo (geotransform) to radiance header"""
    logger.info("---- WRITING MAP INFO TO RADIANCE HEADER ----")
    if not any([rp.exists() for rp in self.rad_im_paths]):
        raise FileNotFoundError(f"No radiance images found in {self.radiance_dir}")
    if not any([gtp.exists() for gtp in self.geotransform_paths]):
        raise FileNotFoundError(f"No geotransform JSON files found in {self.geotransform_dir}")
    for rad_path, geotrans_path in zip(self.rad_im_paths, self.geotransform_paths):
        logger.info(f"Adding map info to {rad_path.name}")
        try:
            add_header_mapinfo(rad_path, geotrans_path)
        except Exception as e:
            logger.exception(f"Error adding map info to {rad_path.name}")
            continue

glint_correct_reflectance_images()

Correct for sun and sky glint in reflectance images

Source code in massipipe/pipeline.py
def glint_correct_reflectance_images(self) -> None:
    """Correct for sun and sky glint in reflectance images"""
    logger.info("---- REFLECTANCE GLINT CORRECTION ----")
    self.reflectance_gc_dir.mkdir(exist_ok=True)

    # Calculate reflectance based on glint corrected radiance
    if self.config.reflectance_gc.method == "from_rad_gc":
        logger.info("Calculating glint corrected reflectance from glint corrected radiance")
        reflectance_converter = ReflectanceConverter(
            wl_min=self.config.reflectance.wl_min,
            wl_max=self.config.reflectance.wl_max,
            smooth_spectra=self.config.reflectance_gc.smooth_spectra,
        )

        if not any([rp.exists() for rp in self.rad_gc_im_paths]):
            logger.warning(
                f"No glint corrected radiance images found in {self.radiance_gc_dir}"
            )

        for rad_gc_path, refl_gc_path in zip(self.rad_gc_im_paths, self.refl_gc_im_paths):
            if refl_gc_path.exists() and not self.config.reflectance_gc.overwrite:
                logger.info(f"Image {refl_gc_path.name} exists - skipping.")
                continue
            if rad_gc_path.exists():
                logger.info(f"Calculating reflectance based on {rad_gc_path.name}.")
                try:
                    reflectance_converter.convert_radiance_file_with_irradiance_to_reflectance(
                        rad_gc_path, refl_gc_path
                    )
                except KeyError:
                    logger.error(f"Irradiance spectrum missing in {rad_gc_path}")
                    continue
                except Exception as e:
                    logger.exception(
                        f"Glint corrected reflectance from rad_gc failed for {rad_gc_path.name}"
                    )
                    continue

    # Calculate glint corrected reflectance based on assumption of flat glint spectrum
    elif self.config.reflectance_gc.method == "flat_spec":
        logger.info("Using flat spectrum method for reflectance glint correction")
        glint_corrector = FlatSpecGlintCorrector(
            smooth_with_savitsky_golay=self.config.reflectance_gc.smooth_spectra
        )

        if not any([rp.exists() for rp in self.refl_im_paths]):
            raise FileNotFoundError(f"No reflectance images found in {self.reflectance_dir}")

        for refl_path, refl_gc_path in zip(self.refl_im_paths, self.refl_gc_im_paths):
            if refl_gc_path.exists() and not self.config.reflectance_gc.overwrite:
                logger.info(f"Image {refl_gc_path.name} exists - skipping.")
                continue
            if refl_path.exists():
                logger.info(f"Applying glint correction to {refl_path.name}.")
                try:
                    glint_corrector.glint_correct_image_file(refl_path, refl_gc_path)
                except Exception as e:
                    logger.exception(f"Flat spec glint correction failed for {refl_path.name}")
                    continue
    else:
        logger.error("Unrecognized glint correction method specified in configuration.")

create_glint_corrected_reflectance_rgb_geotiff()

Create georeferenced GeoTIFF versions of glint corrected reflectance

Source code in massipipe/pipeline.py
def create_glint_corrected_reflectance_rgb_geotiff(self) -> None:
    """Create georeferenced GeoTIFF versions of glint corrected reflectance"""
    logger.info("---- EXPORTING RGB GEOTIFF FOR GLINT CORRECTED REFLECTANCE ----")
    self.reflectance_gc_rgb_dir.mkdir(exist_ok=True)

    self._create_rgb_geotiff(
        hyspec_paths=self.refl_gc_im_paths,
        geotransform_paths=self.geotransform_paths,
        geotiff_paths=self.refl_gc_rgb_paths,
        geotiff_overwrite=self.config.reflectance_gc.overwrite,
        rgb_wl=self.config.general.rgb_wl,
    )

_mosaic_geotiffs(geotiff_paths, mosaic_path, mosaic_overwrite, overview_factors, convert_to_8bit=True) staticmethod

Mosaic GeoTIFF images into single GeoTIFF with overviews

Parameters:

Name Type Description Default
geotiff_paths list[Path]

List of GeoTIFFs to mosaic

required
mosaic_path Path

Path to output GeoTIFF

required
mosaic_overwrite bool

Whether to overwrite existing output GeoTIFF.

required
overview_factors Sequence[int]

GeoTIFF overview factors, typically powers of 2 Used to add "pyramid" of lower-resolution images for faster image browsing.

required
convert_to_8bit bool

If true, the mosaic is percentile stretched and converted to 8-bit. This decreases file size and enhances contrast, at the cost of losing the (physical) units of the original image.

True
Source code in massipipe/pipeline.py
@staticmethod
def _mosaic_geotiffs(
    geotiff_paths: list[Path],
    mosaic_path: Path,
    mosaic_overwrite: bool,
    overview_factors: Sequence[int],
    convert_to_8bit: bool = True,
) -> None:
    """Mosaic GeoTIFF images into single GeoTIFF with overviews

    Parameters
    ----------
    geotiff_paths : list[Path]
        List of GeoTIFFs to mosaic
    mosaic_path : Path
        Path to output GeoTIFF
    mosaic_overwrite : bool
        Whether to overwrite existing output GeoTIFF.
    overview_factors : Sequence[int]
        GeoTIFF overview factors, typically powers of 2
        Used to add "pyramid" of lower-resolution images for faster image browsing.
    convert_to_8bit : bool, optional
        If true, the mosaic is percentile stretched and converted to 8-bit.
        This decreases file size and enhances contrast, at the cost of losing the
        (physical) units of the original image.
    """
    if (mosaic_path.exists()) and (not mosaic_overwrite):
        logger.info(f"Mosaic {mosaic_path.name} already exists - skipping.")
        return

    if not any([gtp.exists() for gtp in geotiff_paths]):
        logger.warning(f"None of the listed GeoTIFFs exist.")
        return

    mosaic_geotiffs(geotiff_paths, mosaic_path)
    if convert_to_8bit:
        convert_geotiff_to_8bit(input_image_path=mosaic_path, output_image_path=mosaic_path)
    add_geotiff_overviews(mosaic_path, overview_factors)

mosaic_radiance_geotiffs()

Merge radiance RGB images into mosaic with overviews

Source code in massipipe/pipeline.py
def mosaic_radiance_geotiffs(self) -> None:
    """Merge radiance RGB images into mosaic with overviews"""
    logger.info("---- MOSAICING RADIANCE ----")
    self.mosaic_dir.mkdir(exist_ok=True)

    self._mosaic_geotiffs(
        geotiff_paths=self.rad_rgb_paths,
        mosaic_path=self.mosaic_rad_path,
        mosaic_overwrite=self.config.mosaic.radiance_rgb.overwrite,
        overview_factors=self.config.mosaic.overview_factors,
    )

mosaic_radiance_gc_geotiffs()

Merge radiance_gc RGB images into mosaic with overviews

Source code in massipipe/pipeline.py
def mosaic_radiance_gc_geotiffs(self) -> None:
    """Merge radiance_gc RGB images into mosaic with overviews"""
    logger.info("---- MOSAICING GLINT CORRECTED RADIANCE ----")
    self.mosaic_dir.mkdir(exist_ok=True)

    self._mosaic_geotiffs(
        geotiff_paths=self.rad_gc_rgb_paths,
        mosaic_path=self.mosaic_rad_gc_path,
        mosaic_overwrite=self.config.mosaic.radiance_gc_rgb.overwrite,
        overview_factors=self.config.mosaic.overview_factors,
    )

mosaic_reflectance_gc_geotiffs()

Merge reflectance_gc RGB images into mosaic with overviews

Source code in massipipe/pipeline.py
def mosaic_reflectance_gc_geotiffs(self) -> None:
    """Merge reflectance_gc RGB images into mosaic with overviews"""
    logger.info("---- MOSAICING GLINT CORRECTED REFLECTANCE ----")
    self.mosaic_dir.mkdir(exist_ok=True)

    self._mosaic_geotiffs(
        geotiff_paths=self.refl_gc_rgb_paths,
        mosaic_path=self.mosaic_refl_gc_path,
        mosaic_overwrite=self.config.mosaic.reflectance_gc_rgb.overwrite,
        overview_factors=self.config.mosaic.overview_factors,
    )

delete_existing_products(delete_quicklook=True, delete_radiance=True, delete_radiance_gc=True, delete_reflectance=True, delete_reflectance_gc=True, delete_geotransform=True, delete_imudata=True, delete_mosaics=True)

Delete existing image products ("reset" after previous processing)

Parameters:

Name Type Description Default
delete_quicklook bool

If true, delete 0b_quicklook folder (if it exists)

True
delete_radiance bool

If true, delete 1a_radiance folder (if it exists) If data "starting point" is radiance and not raw files, radiance will not be deleted (delete manually if needed).

True
delete_radiance_gc bool

If true, delete 1b_radiance folder (if it exists)

True
delete_reflectance bool

If true, delete 2a_reflectance folder (if it exists)

True
delete_reflectance_gc bool

If true, delete 2b_reflectance folder (if it exists)

True
delete_geotransform bool

If true, delete geotransform folder (if it exists)

True
delete_imudata bool

If true, delete imudata folder (if it exists) If data "starting point" is radiance and not raw files, imu data will not be deleted (delete manually if needed).

True
delete_mosaics bool

If true, delete mosaics folder (if it exists)

True
Source code in massipipe/pipeline.py
def delete_existing_products(
    self,
    delete_quicklook: bool = True,
    delete_radiance: bool = True,
    delete_radiance_gc: bool = True,
    delete_reflectance: bool = True,
    delete_reflectance_gc: bool = True,
    delete_geotransform: bool = True,
    delete_imudata: bool = True,
    delete_mosaics: bool = True,
) -> None:
    """Delete existing image products ("reset" after previous processing)

    Parameters
    ----------
    delete_quicklook : bool, optional
        If true, delete 0b_quicklook folder (if it exists)
    delete_radiance : bool, optional
        If true, delete 1a_radiance folder (if it exists)
        If data "starting point" is radiance and not raw files,
        radiance will not be deleted (delete manually if needed).
    delete_radiance_gc : bool, optional
        If true, delete 1b_radiance folder (if it exists)
    delete_reflectance : bool, optional
        If true, delete 2a_reflectance folder (if it exists)
    delete_reflectance_gc : bool, optional
        If true, delete 2b_reflectance folder (if it exists)
    delete_geotransform : bool, optional
        If true, delete geotransform folder (if it exists)
    delete_imudata : bool, optional
        If true, delete imudata folder (if it exists)
        If data "starting point" is radiance and not raw files,
        imu data will not be deleted (delete manually if needed).
    delete_mosaics : bool, optional
        If true, delete mosaics folder (if it exists)
    """
    if self.quicklook_dir.exists() and delete_quicklook:
        logger.info(f"Deleting {self.quicklook_dir}")
        shutil.rmtree(self.quicklook_dir)
    if (
        self.radiance_dir.exists()
        and delete_radiance
        and not (self.data_starting_point == "radiance")
    ):
        logger.info(f"Deleting {self.radiance_dir}")
        shutil.rmtree(self.radiance_dir)
    if self.radiance_gc_dir.exists() and delete_radiance_gc:
        logger.info(f"Deleting {self.radiance_gc_dir}")
        shutil.rmtree(self.radiance_gc_dir)
    if self.reflectance_dir.exists() and delete_reflectance:
        logger.info(f"Deleting {self.reflectance_dir}")
        shutil.rmtree(self.reflectance_dir)
    if self.reflectance_gc_dir.exists() and delete_reflectance_gc:
        logger.info(f"Deleting {self.reflectance_gc_dir}")
        shutil.rmtree(self.reflectance_gc_dir)
    if self.geotransform_dir.exists() and delete_geotransform:
        logger.info(f"Deleting {self.geotransform_dir}")
        shutil.rmtree(self.geotransform_dir)
    if (
        self.imudata_dir.exists()
        and delete_imudata
        and not (self.data_starting_point == "radiance")
    ):
        logger.info(f"Deleting {self.imudata_dir}")
        shutil.rmtree(self.imudata_dir)
    if self.mosaic_dir.exists() and delete_mosaics:
        logger.info(f"Deleting {self.mosaic_dir}")
        shutil.rmtree(self.mosaic_dir)

run_quicklook()

Create quicklook versions of images (percentile stretched)

Source code in massipipe/pipeline.py
def run_quicklook(self) -> None:
    """Create quicklook versions of images (percentile stretched)"""
    if self.config.quicklook.create:
        try:
            self.create_quicklook_images()
        except Exception:
            logger.exception("Error while creating quicklook images")

run_raw_data_processing()

Run all data processing steps based on raw data

Source code in massipipe/pipeline.py
def run_raw_data_processing(self) -> None:
    """Run all data processing steps based on raw data"""

    if self.config.imu_data.create:
        try:
            self.parse_and_save_imu_data()
        except Exception:
            logger.exception("Error while parsing and saving IMU data")

    if self.config.radiance.create:
        try:
            self.convert_raw_images_to_radiance()
        except Exception:
            logger.exception("Error while converting raw images to radiance")

    if self.config.irradiance.create:
        try:
            self.convert_raw_spectra_to_irradiance()
        except Exception:
            logger.exception("Error while converting raw spectra to irradiance")

        try:
            self.calibrate_irradiance_wavelengths()
        except Exception:
            logger.exception("Error while calibrating irradiance wavelengths")

run_glint_correction()

Run glint correction using parameters defined in YAML file

The processing steps include: - Fitting a glint correction model to radiance images - Running glint correction and saving glint corrected images - Creating RGB GeoTiff versions of glint corrected images

See massipipe.config.Config and template YAML file for all options.

Source code in massipipe/pipeline.py
def run_glint_correction(self) -> None:
    """Run glint correction using parameters defined in YAML file

    The processing steps include:
        - Fitting a glint correction model to radiance images
        - Running glint correction and saving glint corrected images
        - Creating RGB GeoTiff versions of glint corrected images

    See massipipe.config.Config and template YAML file for all options.

    """

    if self.config.radiance_gc.create:
        try:
            self.glint_correct_radiance_images()
        except Exception:
            logger.exception("Error while glint correcting radiance images")

    if self.config.radiance_gc_rgb.create:
        try:
            self.create_glint_corrected_radiance_rgb_geotiff()
        except Exception:
            logger.exception("Error while creating RGB GeoTIFFs from glint corrected radiance")

    if self.config.reflectance_gc.create:
        try:
            self.glint_correct_reflectance_images()
        except Exception:
            logger.exception("Error while glint correcting radiance images")

    if self.config.reflectance_gc_rgb.create:
        try:
            self.create_glint_corrected_reflectance_rgb_geotiff()
        except Exception:
            logger.exception("Error while creating RGB GeoTIFFs from glint corrected radiance")

run_mosaics()

Run all mosaicing operations

Source code in massipipe/pipeline.py
def run_mosaics(self) -> None:
    """Run all mosaicing operations"""

    if self.config.mosaic.radiance_rgb.create:
        try:
            self.mosaic_radiance_geotiffs()
        except Exception:
            logger.exception(f"Error occured while mosaicing radiance", exc_info=True)

    if self.config.mosaic.radiance_gc_rgb.create:
        try:
            self.mosaic_radiance_gc_geotiffs()
        except Exception:
            logger.error(f"Error occured while mosaicing glint corrected radiance")

    if self.config.mosaic.reflectance_gc_rgb.create:
        try:
            self.mosaic_reflectance_gc_geotiffs()
        except Exception:
            logger.exception(f"Error occured while mosaicing glint corrected reflectance")

run()

Run all processing steps

Source code in massipipe/pipeline.py
def run(self) -> None:
    """Run all processing steps"""
    self.run_quicklook()
    if self.data_starting_point == "raw":
        self.run_raw_data_processing()
    self.run_secondary_processing()
    self.run_glint_correction()
    self.run_mosaics()

export()

Export dataset to ZIP file for archival / publishing

Source code in massipipe/pipeline.py
def export(self) -> None:
    """Export dataset to ZIP file for archival / publishing"""
    # Copy "best" mosaic to separate directory
    if self.config.mosaic.visualization_mosaic == "radiance":
        copy_visualization_mosaic(self.mosaic_rad_path, self.mosaic_visualization_dir)
    else:
        copy_visualization_mosaic(self.mosaic_rad_gc_path, self.mosaic_visualization_dir)

    # Package selected processed data as ZIP file
    zip_file_path = export_dataset_zip(
        self.dataset_dir,
        self.quicklook_dir,
        self.radiance_dir,
        self.imudata_dir,
        self.mosaic_visualization_dir,
        self.config_file_path,
    )
    logger.info(f"Dataset exported to {zip_file_path}")

QuickLookProcessor

Source code in massipipe/quicklook.py
class QuickLookProcessor:
    def __init__(
        self,
        rgb_wl: Optional[Tuple[float, float, float]] = None,
        percentiles: Optional[Tuple[float, float]] = None,
    ):
        """Initialize quicklook processor.

        Parameters
        ----------
        rgb_wl : Optional[Tuple[float, float, float]], optional
            Wavelengths (in nm) for generating RGB images.
            If None, uses default (640.0, 550.0, 460.0).
        percentiles : Optional[Tuple[float, float]], optional
            Percentile limits for percentile stretching of images.
            If None, uses default (2, 98).
        """
        self.rgb_wl = rgb_wl if rgb_wl else (640.0, 550.0, 460.0)
        self.percentiles = percentiles if percentiles else (2, 98)

    def create_quicklook_image_file(
        self, raw_path: Union[Path, str], quicklook_path: Union[Path, str]
    ) -> None:
        """Create per-band percentile stretched RGB image file from raw hyperspectral image.

        Parameters
        ----------
        raw_path : Union[Path, str]
            Path to raw hyperspectral image (header file).
        quicklook_path : Union[Path, str]
            Path to output PNG image file.
        """
        try:
            image, wl, _ = mpu.read_envi(raw_path)
            rgb_image, _ = mpu.rgb_subset_from_hsi(image, wl, rgb_target_wl=self.rgb_wl)
            rgb_image = mpu.percentile_stretch_image(rgb_image, percentiles=self.percentiles)
            mpu.save_png(rgb_image, quicklook_path)
        except Exception as e:
            logger.error(f"Error creating quicklook image file from {raw_path}", exc_info=True)
            raise

__init__(rgb_wl=None, percentiles=None)

Parameters:

Name Type Description Default
rgb_wl Optional[Tuple[float, float, float]]

Wavelengths (in nm) for generating RGB images. If None, uses default (640.0, 550.0, 460.0).

None
percentiles Optional[Tuple[float, float]]

Percentile limits for percentile stretching of images. If None, uses default (2, 98).

None
Source code in massipipe/quicklook.py
def __init__(
    self,
    rgb_wl: Optional[Tuple[float, float, float]] = None,
    percentiles: Optional[Tuple[float, float]] = None,
):
    """Initialize quicklook processor.

    Parameters
    ----------
    rgb_wl : Optional[Tuple[float, float, float]], optional
        Wavelengths (in nm) for generating RGB images.
        If None, uses default (640.0, 550.0, 460.0).
    percentiles : Optional[Tuple[float, float]], optional
        Percentile limits for percentile stretching of images.
        If None, uses default (2, 98).
    """
    self.rgb_wl = rgb_wl if rgb_wl else (640.0, 550.0, 460.0)
    self.percentiles = percentiles if percentiles else (2, 98)

create_quicklook_image_file(raw_path, quicklook_path)

Create per-band percentile stretched RGB image file from raw hyperspectral image.

Parameters:

Name Type Description Default
raw_path Union[Path, str]

Path to raw hyperspectral image (header file).

required
quicklook_path Union[Path, str]

Path to output PNG image file.

required
Source code in massipipe/quicklook.py
def create_quicklook_image_file(
    self, raw_path: Union[Path, str], quicklook_path: Union[Path, str]
) -> None:
    """Create per-band percentile stretched RGB image file from raw hyperspectral image.

    Parameters
    ----------
    raw_path : Union[Path, str]
        Path to raw hyperspectral image (header file).
    quicklook_path : Union[Path, str]
        Path to output PNG image file.
    """
    try:
        image, wl, _ = mpu.read_envi(raw_path)
        rgb_image, _ = mpu.rgb_subset_from_hsi(image, wl, rgb_target_wl=self.rgb_wl)
        rgb_image = mpu.percentile_stretch_image(rgb_image, percentiles=self.percentiles)
        mpu.save_png(rgb_image, quicklook_path)
    except Exception as e:
        logger.error(f"Error creating quicklook image file from {raw_path}", exc_info=True)
        raise

RadianceConverter

A class for converting raw hyperspectral images from Pika L cameras to radiance.

Attributes:

Name Type Description
radiance_calibration_file Union[Path, str]

Path to Imager Calibration Pack (*.icp) file.

rc_dataset RadianceCalibrationDataset

A radiance calibration dataset object representing the calibration data supplied in the *.icp file.

rad_conv_frame NDArray

Frame representing conversion factors from raw data to radiance for every pixel and wavelength.

rad_conv_metadata dict

ENVI metadata for rad_conv_frame.

Methods:

Name Description
convert_raw_image_to_radiance

Convert single image (3D array) from raw to radiance.

convert_raw_file_to_radiance

Read raw file, convert, and save as radiance image.

Notes

Most image sensors register some amount of "dark current", i.e. a signal which is present even though no photons enter the sensor. The dark current should be subtracted for the measurement to be as accurate as possible. The amount of dark current also depends on camera gain (signal amplification) and camera shutter (time available for the sensor to collect photons). The *.icp calibration file (ICP = "Imager Calibration Pack") is a ZIP archive which includes dark current measurements taken at different gain and shutter settings (raw images). To remove dark current from a new image, this set of dark current frames is searched, and the one which best matches the gain and shutter values of the new image is used as a reference. The dark frame needs to be scaled to account for differences in binning, gain and shutter between it and the new image. The dark frame is then subtracted from the image.

To convert raw images from digital numbers to radiance, a physical quantity, the raw images are multiplied with a "radiance conversion frame". This frame represents "microflicks per digital number" for every pixel and every spectral channel, where the spectral radiance unit "flick" is defined as "watts per steradian per square centimeter of surface per micrometer of span in wavelength" (see https://en.wikipedia.org/wiki/Flick_(physics) ). The radiance conversion frame also needs to be scaled to account for differences in binning, gain and shutter.

When both the dark current frame (DCF) and the radiance conversion frame (RCF) have been scaled to match the raw input image (IM), the radiance output image (OI) is given by OI = (IM-DCF)*RCF

Note that the conversion assumes that the camera response is completely linear after dark current is removed. This may not be completely accurate, but is assumed to be within an acceptable margin of error.

Source code in massipipe/radiance.py
class RadianceConverter:
    """A class for converting raw hyperspectral images from Pika L cameras to radiance.

    Attributes
    ----------
    radiance_calibration_file: Union[Path, str]
        Path to Imager Calibration Pack (*.icp) file.
    rc_dataset: RadianceCalibrationDataset
        A radiance calibration dataset object representing the calibration
        data supplied in the *.icp file.
    rad_conv_frame: NDArray
        Frame representing conversion factors from raw data to radiance for
        every pixel and wavelength.
    rad_conv_metadata: dict
        ENVI metadata for rad_conv_frame.

    Methods
    -------
    convert_raw_image_to_radiance(raw_image, raw_image_metadata)
        Convert single image (3D array) from raw to radiance.
    convert_raw_file_to_radiance(raw_header_path, radiance_header_path, interleave='bip')
        Read raw file, convert, and save as radiance image.

    Notes
    -----
    Most image sensors register some amount of "dark current", i.e. a signal
    which is present even though no photons enter the sensor. The dark current
    should be subtracted for the measurement to be as accurate as possible.
    The amount of dark current also depends on camera gain (signal amplification)
    and camera shutter (time available for the sensor to collect photons).
    The *.icp calibration file (ICP = "Imager Calibration Pack") is a ZIP archive
    which includes dark current measurements taken at different gain and shutter
    settings (raw images). To remove dark current from a new image, this set of
    dark current frames is searched, and the one which best matches the gain and
    shutter values of the new image is used as a reference. The dark frame
    needs to be scaled to account for differences in binning, gain and shutter
    between it and the new image. The dark frame is then subtracted from the image.

    To convert raw images from digital numbers to radiance, a physical quantity,
    the raw images are multiplied with a "radiance conversion frame". This frame
    represents "microflicks per digital number" for every pixel and every
    spectral channel, where the spectral radiance unit "flick" is defined as
    "watts per steradian per square centimeter of surface per micrometer of span
    in wavelength" (see https://en.wikipedia.org/wiki/Flick_(physics) ).
    The radiance conversion frame also needs to be scaled to account for differences
    in binning, gain and shutter.

    When both the dark current frame (DCF) and the radiance conversion frame (RCF)
    have been scaled to match the raw input image (IM), the radiance output image
    (OI) is given by OI = (IM-DCF)*RCF

    Note that the conversion assumes that the camera response is completely linear
    after dark current is removed. This may not be completely accurate, but is
    assumed to be within an acceptable margin of error.

    """

    def __init__(
        self,
        radiance_calibration_file: Union[Path, str],
        set_saturated_pixels_to_zero: Union[bool, None] = True,
        saturation_value: int = 2**12 - 1,
    ):
        """Create radiance converter object.

        Parameters
        ----------
        radiance_calibration_file: Union[Path, str]
            Path to Imager Calibration Pack (*.icp) file.
            The *.icp file is a zip archive, and the file will be unzipped into
            a subfolder in the same folder containing the *.icp file.
        set_saturated_pixels_to_zero: bool, optional
            If True, saturated pixels are set to all-zero (across all bands).
        saturation_value: int, optional
            Maximum digital number for camera sensor.

        """
        self.radiance_calibration_file = Path(radiance_calibration_file)
        self.rc_dataset = RadianceCalibrationDataset(calibration_file=radiance_calibration_file)
        self.set_saturated_pixels_to_zero = bool(set_saturated_pixels_to_zero)
        self.saturation_value = saturation_value
        self.rad_conv_frame = np.array([])
        self.rad_conv_metadata = dict()
        self._get_rad_conv_frame()

    def _get_rad_conv_frame(self) -> None:
        """Read radiance conversion frame from file and save as attribute."""
        rad_conv_frame, _, rad_conv_metadata = self.rc_dataset.get_rad_conv_frame()
        if (
            rad_conv_metadata["sample binning"] != "1"
            or rad_conv_metadata["spectral binning"] != "1"
        ):
            raise ValueError("Radiance conversion frame must have binning of 1.")
        if rad_conv_metadata["samples"] != "900" or rad_conv_metadata["bands"] != "600":
            raise ValueError("Radiance conversion frame must have 900 samples and 600 bands.")
        self.rad_conv_frame = rad_conv_frame
        self.rad_conv_metadata = rad_conv_metadata

    def _get_best_matching_dark_frame(self, raw_image_metadata: dict) -> tuple[NDArray, dict]:
        """Get dark frame from calibration data that best matches input data."""
        dark_frame, _, dark_frame_metadata, _, _ = self.rc_dataset.get_closest_dark_frame(
            gain=float(raw_image_metadata["gain"]),
            shutter=float(raw_image_metadata["shutter"]),
        )
        return dark_frame, dark_frame_metadata

    def _scale_dark_frame(
        self,
        dark_frame: NDArray,
        dark_frame_metadata: dict,
        raw_image_metadata: dict,
    ) -> NDArray:
        """Scale dark frame to match binning for input image."""
        if (
            dark_frame_metadata["sample binning"] != "1"
            or dark_frame_metadata["spectral binning"] != "1"
        ):
            raise ValueError("Dark frame must have binning of 1.")
        binning_factor = float(raw_image_metadata["sample binning"]) * float(
            raw_image_metadata["spectral binning"]
        )
        dark_frame = mpu.bin_image(
            dark_frame,
            sample_bin_size=int(raw_image_metadata["sample binning"]),
            channel_bin_size=int(raw_image_metadata["spectral binning"]),
        )
        dark_frame = dark_frame * binning_factor
        # NOTE: Dark frame not scaled based on differences in gain and shutter because
        # the best matching dark frame has (approx.) the same values already.

        return dark_frame

    def _scale_rad_conv_frame(self, raw_image_metadata: dict) -> NDArray:
        """Scale radiance conversion frame to match input binning, gain and shutter."""
        # Scaling due to binning
        binning_factor = 1.0 / (
            float(raw_image_metadata["sample binning"])
            * float(raw_image_metadata["spectral binning"])
        )

        # Scaling due to gain differences
        rad_conv_gain = 10 ** (float(self.rad_conv_metadata["gain"]) / 20.0)
        input_gain = 10 ** (float(raw_image_metadata["gain"]) / 20.0)
        gain_factor = rad_conv_gain / input_gain

        # Scaling due to shutter differences
        rad_conv_shutter = float(self.rad_conv_metadata["shutter"])
        input_shutter = float(raw_image_metadata["shutter"])
        shutter_factor = rad_conv_shutter / input_shutter

        # Bin (average) radiance conversion frame to have same dimensions as input
        rad_conv_frame = mpu.bin_image(
            self.rad_conv_frame,
            sample_bin_size=int(raw_image_metadata["sample binning"]),
            channel_bin_size=int(raw_image_metadata["spectral binning"]),
            average=True,
        )

        # Combine factors and scale frame
        scaling_factor = binning_factor * gain_factor * shutter_factor
        rad_conv_frame = rad_conv_frame * scaling_factor

        return rad_conv_frame

    def convert_raw_image_to_radiance(
        self,
        raw_image: NDArray,
        raw_image_metadata: dict,
    ) -> NDArray:
        """Convert raw image (3D array) to radiance image.

        Parameters
        ----------
        raw_image: NDArray
            Raw hyperspectral image, shape (n_lines, n_samples, n_channels).
            The image is assumed to have been created by a Resonon Pika L
            camera, which has 900 spatial pixels x 600 spectral channels
            before any binning has been applied. Typically, spectral binning
            with a bin size of 2 is applied during image acquisition, resulting
            in images with shape (n_lines, 900, 300). It is assumed that no
            spectral or spatial (sample) cropping has been applied. Where binning
            has been applied, it is assumed that
                - n_samples*sample_bin_size = 900
                - n_channels*channel_bin_size = 600.
        raw_image_metadata: dict
            ENVI metadata formatted as dict.
            See spectral.io.envi.open().

        Returns
        -------
        NDArray
            Radiance image with same shape as raw image, with spectral radiance
            in units of microflicks = 10e-5 W/(m2*nm). Microflicks are used
            to be consistent with Resonon formatting, and because microflick
            values typically are in a range suitable for (memory-efficient)
            encoding as 16-bit unsigned integer.

        Raises
        ------
        ValueError:
            In case the raw image does not have the expected dimensions.

        References
        ----------
        - ["flick" unit](https://en.wikipedia.org/wiki/Flick_(physics))
        """
        # Check input dimensions
        if int(raw_image_metadata["samples"]) * int(raw_image_metadata["sample binning"]) != 900:
            raise ValueError(
                "Sample count and binning does not correspond to "
                "900 samples in the original image."
            )
        if int(raw_image_metadata["bands"]) * int(raw_image_metadata["spectral binning"]) != 600:
            raise ValueError(
                "Spectral band count and binning does not correspond to "
                "600 spectral bands in the original image."
            )

        # Get dark frame and radiance conversion frames scaled to input image
        dark_frame, dark_frame_metadata = self._get_best_matching_dark_frame(raw_image_metadata)
        dark_frame = self._scale_dark_frame(dark_frame, dark_frame_metadata, raw_image_metadata)
        rad_conv_frame = self._scale_rad_conv_frame(raw_image_metadata)

        # Flip frames if necessary
        if ("flip radiometric calibration" in raw_image_metadata) and (
            raw_image_metadata["flip radiometric calibration"] == "True"
        ):
            dark_frame = np.flip(dark_frame, axis=1)
            rad_conv_frame = np.flip(rad_conv_frame, axis=1)

        # Subtract dark current and convert to radiance (microflicks)
        # Note: dark_frame and rad_conv_frame are implicitly "expanded" by broadcasting
        radiance_image = (raw_image - dark_frame) * rad_conv_frame

        # Set negative (non-physical) values to zero
        radiance_image[radiance_image < 0] = 0

        # Set saturated pixels to zero (optional)
        if self.set_saturated_pixels_to_zero:
            radiance_image[np.any(raw_image >= self.saturation_value, axis=2)] = 0

        # Convert to 16-bit integer format (more efficient for storage)
        return radiance_image.astype(np.uint16)

    def convert_raw_file_to_radiance(
        self,
        raw_header_path: Union[Path, str],
        radiance_header_path: Union[Path, str],
        interleave: str = "bip",
    ) -> None:
        """Read raw image file, convert to radiance, and save to file.

        Parameters
        ----------
        raw_header_path: Path | str
            Path to raw hyperspectral image acquired with Resonon Pika L camera.
        radiance_header_path: Path | str
            Path to save converted radiance image to.
            The name of the header file should match the 'interleave' argument
            (default: bip), e.g. 'radiance_image.bip.hdr'.
        interleave: str, {'bip','bil','bsq'}, default 'bip'
            String indicating how binary image file is organized.
            See spectral.io.envi.save_image().

        Notes
        -----
        The radiance image is saved with the same metadata as the raw image.
        """
        raw_image, _, metadata = mpu.read_envi(raw_header_path)
        radiance_image = self.convert_raw_image_to_radiance(raw_image, metadata)

        mpu.save_envi(
            radiance_header_path,
            radiance_image,
            metadata,
            interleave=interleave,
        )

__init__(radiance_calibration_file, set_saturated_pixels_to_zero=True, saturation_value=2 ** 12 - 1)

Parameters:

Name Type Description Default
radiance_calibration_file Union[Path, str]

Path to Imager Calibration Pack (.icp) file. The .icp file is a zip archive, and the file will be unzipped into a subfolder in the same folder containing the *.icp file.

required
set_saturated_pixels_to_zero Union[bool, None]

If True, saturated pixels are set to all-zero (across all bands).

True
saturation_value int

Maximum digital number for camera sensor.

2 ** 12 - 1
Source code in massipipe/radiance.py
def __init__(
    self,
    radiance_calibration_file: Union[Path, str],
    set_saturated_pixels_to_zero: Union[bool, None] = True,
    saturation_value: int = 2**12 - 1,
):
    """Create radiance converter object.

    Parameters
    ----------
    radiance_calibration_file: Union[Path, str]
        Path to Imager Calibration Pack (*.icp) file.
        The *.icp file is a zip archive, and the file will be unzipped into
        a subfolder in the same folder containing the *.icp file.
    set_saturated_pixels_to_zero: bool, optional
        If True, saturated pixels are set to all-zero (across all bands).
    saturation_value: int, optional
        Maximum digital number for camera sensor.

    """
    self.radiance_calibration_file = Path(radiance_calibration_file)
    self.rc_dataset = RadianceCalibrationDataset(calibration_file=radiance_calibration_file)
    self.set_saturated_pixels_to_zero = bool(set_saturated_pixels_to_zero)
    self.saturation_value = saturation_value
    self.rad_conv_frame = np.array([])
    self.rad_conv_metadata = dict()
    self._get_rad_conv_frame()

_get_rad_conv_frame()

Read radiance conversion frame from file and save as attribute.

Source code in massipipe/radiance.py
def _get_rad_conv_frame(self) -> None:
    """Read radiance conversion frame from file and save as attribute."""
    rad_conv_frame, _, rad_conv_metadata = self.rc_dataset.get_rad_conv_frame()
    if (
        rad_conv_metadata["sample binning"] != "1"
        or rad_conv_metadata["spectral binning"] != "1"
    ):
        raise ValueError("Radiance conversion frame must have binning of 1.")
    if rad_conv_metadata["samples"] != "900" or rad_conv_metadata["bands"] != "600":
        raise ValueError("Radiance conversion frame must have 900 samples and 600 bands.")
    self.rad_conv_frame = rad_conv_frame
    self.rad_conv_metadata = rad_conv_metadata

_get_best_matching_dark_frame(raw_image_metadata)

Get dark frame from calibration data that best matches input data.

Source code in massipipe/radiance.py
def _get_best_matching_dark_frame(self, raw_image_metadata: dict) -> tuple[NDArray, dict]:
    """Get dark frame from calibration data that best matches input data."""
    dark_frame, _, dark_frame_metadata, _, _ = self.rc_dataset.get_closest_dark_frame(
        gain=float(raw_image_metadata["gain"]),
        shutter=float(raw_image_metadata["shutter"]),
    )
    return dark_frame, dark_frame_metadata

_scale_dark_frame(dark_frame, dark_frame_metadata, raw_image_metadata)

Scale dark frame to match binning for input image.

Source code in massipipe/radiance.py
def _scale_dark_frame(
    self,
    dark_frame: NDArray,
    dark_frame_metadata: dict,
    raw_image_metadata: dict,
) -> NDArray:
    """Scale dark frame to match binning for input image."""
    if (
        dark_frame_metadata["sample binning"] != "1"
        or dark_frame_metadata["spectral binning"] != "1"
    ):
        raise ValueError("Dark frame must have binning of 1.")
    binning_factor = float(raw_image_metadata["sample binning"]) * float(
        raw_image_metadata["spectral binning"]
    )
    dark_frame = mpu.bin_image(
        dark_frame,
        sample_bin_size=int(raw_image_metadata["sample binning"]),
        channel_bin_size=int(raw_image_metadata["spectral binning"]),
    )
    dark_frame = dark_frame * binning_factor
    # NOTE: Dark frame not scaled based on differences in gain and shutter because
    # the best matching dark frame has (approx.) the same values already.

    return dark_frame

_scale_rad_conv_frame(raw_image_metadata)

Scale radiance conversion frame to match input binning, gain and shutter.

Source code in massipipe/radiance.py
def _scale_rad_conv_frame(self, raw_image_metadata: dict) -> NDArray:
    """Scale radiance conversion frame to match input binning, gain and shutter."""
    # Scaling due to binning
    binning_factor = 1.0 / (
        float(raw_image_metadata["sample binning"])
        * float(raw_image_metadata["spectral binning"])
    )

    # Scaling due to gain differences
    rad_conv_gain = 10 ** (float(self.rad_conv_metadata["gain"]) / 20.0)
    input_gain = 10 ** (float(raw_image_metadata["gain"]) / 20.0)
    gain_factor = rad_conv_gain / input_gain

    # Scaling due to shutter differences
    rad_conv_shutter = float(self.rad_conv_metadata["shutter"])
    input_shutter = float(raw_image_metadata["shutter"])
    shutter_factor = rad_conv_shutter / input_shutter

    # Bin (average) radiance conversion frame to have same dimensions as input
    rad_conv_frame = mpu.bin_image(
        self.rad_conv_frame,
        sample_bin_size=int(raw_image_metadata["sample binning"]),
        channel_bin_size=int(raw_image_metadata["spectral binning"]),
        average=True,
    )

    # Combine factors and scale frame
    scaling_factor = binning_factor * gain_factor * shutter_factor
    rad_conv_frame = rad_conv_frame * scaling_factor

    return rad_conv_frame

convert_raw_image_to_radiance(raw_image, raw_image_metadata)

Convert raw image (3D array) to radiance image.

Parameters:

Name Type Description Default
raw_image NDArray

Raw hyperspectral image, shape (n_lines, n_samples, n_channels). The image is assumed to have been created by a Resonon Pika L camera, which has 900 spatial pixels x 600 spectral channels before any binning has been applied. Typically, spectral binning with a bin size of 2 is applied during image acquisition, resulting in images with shape (n_lines, 900, 300). It is assumed that no spectral or spatial (sample) cropping has been applied. Where binning has been applied, it is assumed that - n_samplessample_bin_size = 900 - n_channelschannel_bin_size = 600.

required
raw_image_metadata dict

ENVI metadata formatted as dict. See spectral.io.envi.open().

required

Returns:

Type Description
NDArray

Radiance image with same shape as raw image, with spectral radiance in units of microflicks = 10e-5 W/(m2*nm). Microflicks are used to be consistent with Resonon formatting, and because microflick values typically are in a range suitable for (memory-efficient) encoding as 16-bit unsigned integer.

Raises:

Type Description
ValueError:

In case the raw image does not have the expected dimensions.

References
Source code in massipipe/radiance.py
def convert_raw_image_to_radiance(
    self,
    raw_image: NDArray,
    raw_image_metadata: dict,
) -> NDArray:
    """Convert raw image (3D array) to radiance image.

    Parameters
    ----------
    raw_image: NDArray
        Raw hyperspectral image, shape (n_lines, n_samples, n_channels).
        The image is assumed to have been created by a Resonon Pika L
        camera, which has 900 spatial pixels x 600 spectral channels
        before any binning has been applied. Typically, spectral binning
        with a bin size of 2 is applied during image acquisition, resulting
        in images with shape (n_lines, 900, 300). It is assumed that no
        spectral or spatial (sample) cropping has been applied. Where binning
        has been applied, it is assumed that
            - n_samples*sample_bin_size = 900
            - n_channels*channel_bin_size = 600.
    raw_image_metadata: dict
        ENVI metadata formatted as dict.
        See spectral.io.envi.open().

    Returns
    -------
    NDArray
        Radiance image with same shape as raw image, with spectral radiance
        in units of microflicks = 10e-5 W/(m2*nm). Microflicks are used
        to be consistent with Resonon formatting, and because microflick
        values typically are in a range suitable for (memory-efficient)
        encoding as 16-bit unsigned integer.

    Raises
    ------
    ValueError:
        In case the raw image does not have the expected dimensions.

    References
    ----------
    - ["flick" unit](https://en.wikipedia.org/wiki/Flick_(physics))
    """
    # Check input dimensions
    if int(raw_image_metadata["samples"]) * int(raw_image_metadata["sample binning"]) != 900:
        raise ValueError(
            "Sample count and binning does not correspond to "
            "900 samples in the original image."
        )
    if int(raw_image_metadata["bands"]) * int(raw_image_metadata["spectral binning"]) != 600:
        raise ValueError(
            "Spectral band count and binning does not correspond to "
            "600 spectral bands in the original image."
        )

    # Get dark frame and radiance conversion frames scaled to input image
    dark_frame, dark_frame_metadata = self._get_best_matching_dark_frame(raw_image_metadata)
    dark_frame = self._scale_dark_frame(dark_frame, dark_frame_metadata, raw_image_metadata)
    rad_conv_frame = self._scale_rad_conv_frame(raw_image_metadata)

    # Flip frames if necessary
    if ("flip radiometric calibration" in raw_image_metadata) and (
        raw_image_metadata["flip radiometric calibration"] == "True"
    ):
        dark_frame = np.flip(dark_frame, axis=1)
        rad_conv_frame = np.flip(rad_conv_frame, axis=1)

    # Subtract dark current and convert to radiance (microflicks)
    # Note: dark_frame and rad_conv_frame are implicitly "expanded" by broadcasting
    radiance_image = (raw_image - dark_frame) * rad_conv_frame

    # Set negative (non-physical) values to zero
    radiance_image[radiance_image < 0] = 0

    # Set saturated pixels to zero (optional)
    if self.set_saturated_pixels_to_zero:
        radiance_image[np.any(raw_image >= self.saturation_value, axis=2)] = 0

    # Convert to 16-bit integer format (more efficient for storage)
    return radiance_image.astype(np.uint16)

convert_raw_file_to_radiance(raw_header_path, radiance_header_path, interleave='bip')

Read raw image file, convert to radiance, and save to file.

Parameters:

Name Type Description Default
raw_header_path Union[Path, str]

Path to raw hyperspectral image acquired with Resonon Pika L camera.

required
radiance_header_path Union[Path, str]

Path to save converted radiance image to. The name of the header file should match the 'interleave' argument (default: bip), e.g. 'radiance_image.bip.hdr'.

required
interleave str

String indicating how binary image file is organized. See spectral.io.envi.save_image().

'bip'
Notes

The radiance image is saved with the same metadata as the raw image.

Source code in massipipe/radiance.py
def convert_raw_file_to_radiance(
    self,
    raw_header_path: Union[Path, str],
    radiance_header_path: Union[Path, str],
    interleave: str = "bip",
) -> None:
    """Read raw image file, convert to radiance, and save to file.

    Parameters
    ----------
    raw_header_path: Path | str
        Path to raw hyperspectral image acquired with Resonon Pika L camera.
    radiance_header_path: Path | str
        Path to save converted radiance image to.
        The name of the header file should match the 'interleave' argument
        (default: bip), e.g. 'radiance_image.bip.hdr'.
    interleave: str, {'bip','bil','bsq'}, default 'bip'
        String indicating how binary image file is organized.
        See spectral.io.envi.save_image().

    Notes
    -----
    The radiance image is saved with the same metadata as the raw image.
    """
    raw_image, _, metadata = mpu.read_envi(raw_header_path)
    radiance_image = self.convert_raw_image_to_radiance(raw_image, metadata)

    mpu.save_envi(
        radiance_header_path,
        radiance_image,
        metadata,
        interleave=interleave,
    )

ReflectanceConverter

ReflectanceConverter class for converting images from Resonon Pika L cameras to reflectance.

This class provides methods for converting radiance images to reflectance using per-image or mean irradiance spectra. It supports optional Gaussian smoothing of the irradiance spectrum and additional spectral smoothing using a Savitzky-Golay filter.

Attributes:

Name Type Description
wl_min float

Lower wavelength bound (nm) for the reflectance image.

wl_max float

Upper wavelength bound (nm) for the reflectance image.

conv_irrad_with_gauss bool

Indicates if the irradiance spectrum should be smoothed with a Gaussian kernel.

fwhm_irrad_smooth float

Full-width-half-maximum for the Gaussian smoothing kernel (in nm).

smooth_spectra bool

If True, applies a Savitzky-Golay filter to the reflectance spectra.

refl_from_mean_irrad bool

If True, uses a computed mean irradiance value for conversion instead of per-image irradiance.

ref_irrad_spec_mean NDArray

Mean irradiance spectrum used for reflectance conversion when refl_from_mean_irrad is True.

ref_irrad_spec_wl NDArray

Wavelength array corresponding to the irradiance spectrum.

Source code in massipipe/reflectance.py
class ReflectanceConverter:
    """ReflectanceConverter class for converting images from Resonon Pika L cameras to reflectance.

    This class provides methods for converting radiance images to reflectance using per-image
    or mean irradiance spectra. It supports optional Gaussian smoothing of the irradiance spectrum
    and additional spectral smoothing using a Savitzky-Golay filter.

    Attributes
    ----------
    wl_min : float
        Lower wavelength bound (nm) for the reflectance image.
    wl_max : float
        Upper wavelength bound (nm) for the reflectance image.
    conv_irrad_with_gauss : bool
        Indicates if the irradiance spectrum should be smoothed with a Gaussian kernel.
    fwhm_irrad_smooth : float
        Full-width-half-maximum for the Gaussian smoothing kernel (in nm).
    smooth_spectra : bool
        If True, applies a Savitzky-Golay filter to the reflectance spectra.
    refl_from_mean_irrad : bool
        If True, uses a computed mean irradiance value for conversion instead of
        per-image irradiance.
    ref_irrad_spec_mean : NDArray
        Mean irradiance spectrum used for reflectance conversion when
        refl_from_mean_irrad is True.
    ref_irrad_spec_wl : NDArray
        Wavelength array corresponding to the irradiance spectrum.
    """

    def __init__(
        self,
        wl_min: Optional[float] = None,
        wl_max: Optional[float] = None,
        conv_irrad_with_gauss: Optional[bool] = True,
        fwhm_irrad_smooth: Optional[float] = 3.5,
        smooth_spectra: Optional[bool] = False,
        refl_from_mean_irrad: bool = False,
        irrad_spec_paths: Optional[Iterable[Union[Path, str]]] = None,
    ) -> None:
        """Initialize reflectance converter

        Parameters
        ----------
        wl_min : Optional[float], default None
            Minimum wavelength (nm) to include in reflectance image.
            If None, -float("inf") is used, and no lower limit is applied.
        wl_max : Optional[float], default None
            Maximum wavelength (nm) to include in reflectance image.
            If None, float("inf") is used, and no upper limit is applied.
        conv_irrad_with_gauss: bool, default True
            Indicate if irradiance spectrum should be smoothed with Gaussian kernel.
            This may be useful if irradiance is measured with a higher spectral
            resolution than radiance, and thus has sharper "spikes".
        fwhm_irrad_smooth: float, default 3.5
            Full-width-half-maximum for Gaussian smoothing kernel, in nanometers.
            Only used if conv_irrad_with_gauss==True
        smooth_spectra: bool, default False
            Whether to smooth the reflectance spectra using a Savitzky-Golay filter
        refl_from_mean_irrad: bool, default False
            If True, the mean irradiance for the whole dataset is used for
            calculating reflectance, rather than the irradiance for a single image.
            This can be useful if individual irradiance are compromised and
            using the mean is more "robust". Paths to the irradiance files
            (irrad_spec_paths) must be specified.
        irrad_spec_paths : Optional[Iterable[Union[Path, str]]], default None
            List of paths to irradiance spectra for the dataset.
            If specified (not None), a mean irradiance value is caluculated based
            on the spectra, and this irradiance value is used for every reflectance
            conversion, rather than the

        Notes
        ------
        The signal-to-noise ratio of both radiance images and irradiance
        spectra is generally lower at the low and high ends. When
        radiance is divided by noisy irradiance values close to zero, the
        noise can "blow up". Limiting the wavelength range can ensure
        that the reflectance images have more well-behaved values.
        """

        self.wl_min = wl_min if wl_min is not None else -float("inf")
        self.wl_max = wl_max if wl_max is not None else float("inf")
        self.conv_irrad_with_gauss = (
            True if conv_irrad_with_gauss or (conv_irrad_with_gauss is None) else False
        )
        self.fwhm_irrad_smooth = fwhm_irrad_smooth if fwhm_irrad_smooth is not None else 3.5
        self.smooth_spectra = bool(smooth_spectra)

        if refl_from_mean_irrad and irrad_spec_paths is not None:
            self.refl_from_mean_irrad = True
            irrad_spec_mean, irrad_wl, _ = self._get_mean_irrad_spec(irrad_spec_paths)
        else:
            self.refl_from_mean_irrad = False
            irrad_spec_mean, irrad_wl = np.array([]), np.array([])
        self.ref_irrad_spec_mean = irrad_spec_mean
        self.ref_irrad_spec_wl = irrad_wl

    @staticmethod
    def _get_mean_irrad_spec(
        irrad_spec_paths: Iterable[Union[Path, str]],
    ) -> Tuple[NDArray, NDArray, NDArray]:
        """Read irradiance spectra from file and calculate mean"""
        irrad_spectra = []
        for irrad_spec_path in irrad_spec_paths:
            if Path(irrad_spec_path).exists():
                irrad_spec, irrad_wl, _ = mpu.read_envi(irrad_spec_path)
                irrad_spectra.append(irrad_spec.squeeze())
        if not irrad_spectra:
            raise FileNotFoundError(f"No valid irradiance spectra found.")
        irrad_spectra = np.array(irrad_spectra)
        irrad_spec_mean = np.mean(irrad_spectra, axis=0)
        return irrad_spec_mean, irrad_wl, irrad_spectra

    def conv_spec_with_gaussian(self, spec: NDArray, wl: NDArray) -> NDArray:
        """Convolve spectrum with Gaussian kernel to smooth / blur spectral details

        Parameters
        ----------
        spec: NDArray
            Input spectrum, shape (n_bands,)
        wl: NDArray, nanometers
            Wavelengths corresponding to each spectrum value, shape (n_bands,)

        Returns
        -------
        spec_filtered: NDArray
            Filtered / smoothed version of spec, with same dimensions

        Notes
        -----
        When the kernel extends outside the data while filtering, edges are handled
        by repeating the nearest sampled value (edge value).

        """
        sigma_wl = self.fwhm_irrad_smooth * 0.588705  # sigma = FWHM / 2*sqrt(2*ln(2))
        dwl = np.mean(np.diff(wl))  # Downwelling wavelength sampling dist.
        sigma_pix = sigma_wl / dwl  # Sigma in units of spectral samples
        spec_filtered = gaussian_filter1d(input=spec, sigma=sigma_pix, mode="nearest")
        return spec_filtered

    @staticmethod
    def _interpolate_irrad_to_image_wl(
        irrad_spec: NDArray,
        irrad_wl: NDArray,
        image_wl: NDArray,
    ) -> NDArray:
        """Interpolate downwelling spectrum to image wavelengths"""
        return np.interp(x=image_wl, xp=irrad_wl, fp=irrad_spec)

    def convert_radiance_image_to_reflectance(
        self,
        rad_image: NDArray,
        rad_wl: NDArray,
        irrad_spec: NDArray,
        irrad_wl: NDArray,
    ) -> Tuple[NDArray, NDArray, NDArray]:
        """Convert radiance image to reflectance using downwelling spectrum

        Parameters
        ----------
        rad_image: NDArray
            Spectral radiance image in units of microflicks = 10e-5 W/(sr*m2*nm)
            Shape (n_lines, n_samples, n_bands)
        rad_wl: NDArray
            Wavelengths (in nanometers) corresponding to each band in rad_image
        irrad_spec: NDArray
            Spectral irradiance in units of W/(m2*nm)
        irrad_wl: NDArray
            Wavelengths (in nanometers) corresponding to each band in irrad_spec

        Returns
        -------
        refl_image: NDArray
            Reflectance image, unitless. Shape is same as rad_image for the
            first 2 axes, but may be less than that of rad_image along
            3rd axis due to limiting of wavelength range.
        refl_wl: NDArray
            Wavelengths for reflectance image
        irrad_spec:
            Irradiance spectrum used for reflectance conversion.
            The spectrum has been interpolated to match the wavelengths
            of refl_image.

        """

        # Check that spectrum is 1D, then expand to 3D for broadcasting
        irrad_spec = np.squeeze(irrad_spec)
        if irrad_spec.ndim != 1:
            raise ValueError("irrad_spec must be 1-dimensional")

        # Limit output wavelength range
        valid_image_wl_ind = (
            (rad_wl >= self.wl_min)
            & (rad_wl <= self.wl_max)
            & (rad_wl >= irrad_wl[0])
            & (rad_wl <= irrad_wl[-1])
        )
        rad_wl = rad_wl[valid_image_wl_ind]
        rad_image = rad_image[:, :, valid_image_wl_ind]

        # Make irradiance spectrum compatible with image
        irrad_spec = irrad_spec * 100_000  # Convert from W/(m2*nm) to uW/(cm2*um)
        if self.conv_irrad_with_gauss:
            irrad_spec = self.conv_spec_with_gaussian(irrad_spec, irrad_wl)
        irrad_spec = self._interpolate_irrad_to_image_wl(irrad_spec, irrad_wl, rad_wl)
        irrad_spec = np.expand_dims(irrad_spec, axis=(0, 1))

        # Set zero-valued irradiance elements to small value to avoid divide-by-zero
        irrad_spec[irrad_spec < 1.0] = 1.0  # <= 1 uW/(cm2*um)

        # Convert to reflectance, assuming Lambertian (perfectly diffuse) surface
        refl_image = np.pi * (rad_image.astype(np.float32) / irrad_spec.astype(np.float32))
        refl_wl = rad_wl

        # Spectral smoothing (optional)
        if self.smooth_spectra:
            refl_image = mpu.savitzky_golay_filter(refl_image)

        return refl_image, refl_wl, irrad_spec

    def convert_radiance_file_to_reflectance(
        self,
        radiance_image_header: Union[Path, str],
        irradiance_header: Union[Path, str],
        reflectance_image_header: Union[Path, str],
    ) -> None:
        """Read radiance image from file, convert to reflectance and save

        Parameters
        ----------
        radiance_image_header: Union[Path, str]
            Path to header file for radiance image.
        irradiance_header: Union[Path, str]
            Path to ENVI file containing irradiance measurement
            corresponding to radiance image file.
            Not used if self.refl_from_mean_irrad is True - in this
            case, it can be set to None.
        reflectance_image_header: Union[Path, str]
            Path to header file for (output) reflectance image.
            Binary file will be saved with same name, except .hdr extension.
        """
        rad_image, rad_wl, rad_meta = mpu.read_envi(radiance_image_header)
        if self.refl_from_mean_irrad:
            irrad_spec = self.ref_irrad_spec_mean
            irrad_wl = self.ref_irrad_spec_wl
        else:
            irrad_spec, irrad_wl, _ = mpu.read_envi(irradiance_header)

        refl_im, refl_wl, _ = self.convert_radiance_image_to_reflectance(
            rad_image, rad_wl, irrad_spec, irrad_wl
        )

        wl_str = mpu.array_to_header_string(refl_wl)
        refl_meta = rad_meta
        refl_meta["wavelength"] = wl_str
        mpu.save_envi(reflectance_image_header, refl_im, refl_meta)

    def add_irradiance_spectrum_to_header(
        self,
        radiance_image_header: Union[Path, str],
        irradiance_header: Optional[Union[Path, str]],
    ) -> None:
        """Add irradiance spectrum to radiance image header

        Parameters
        ----------
        radiance_image_header : Union[Path, str]
            Path to radiance image
        irradiance_header : Union[Path, str, None]
            Path to irradiance spectrum header.
            If refl_from_mean_irrad == True for ReflectanceConverter,
            the calculated mean spectrum is used, and irradiance_header can be set to None.
        """
        if self.refl_from_mean_irrad:
            irrad_spec = self.ref_irrad_spec_mean
            irrad_wl = self.ref_irrad_spec_wl
        elif irradiance_header is not None:
            irrad_spec, irrad_wl, _ = mpu.read_envi(irradiance_header)
            irrad_spec = np.squeeze(irrad_spec)  # Convert to 1D array
        else:
            raise ValueError("Must specify irradiance spectrum file if not using mean irradiance.")

        _, rad_wl = mpu.read_envi_header(radiance_image_header)
        irrad_spec_interp = self._interpolate_irrad_to_image_wl(irrad_spec, irrad_wl, rad_wl)
        irrad_spec_interp = mpu.irrad_si_nm_to_si_um(irrad_spec_interp)  # Convert to W/(m2*um)
        mpu.add_header_irradiance(irrad_spec_interp, radiance_image_header)

    def convert_radiance_file_with_irradiance_to_reflectance(
        self,
        radiance_image_header: Union[Path, str],
        reflectance_image_header: Union[Path, str],
    ) -> None:
        """Convert radiance image with irradiance spectrum in header to reflectance

        The irradiance information is read from the field "solar irradiance" in the
        header of the radiance image file. The irradiance is assumed to be in units
        W/(m2*um), which is standard for ENVI files, and the irradiance values are
        assumed to match the wavelengths of the image.

        Parameters
        ----------
        radiance_image_header: Union[Path, str]
            Path to header file for radiance image.
        reflectance_image_header: Union[Path, str]
            Path to header file for (output) reflectance image.
            Binary file will be saved with same name, except .hdr extension.

        """
        # Read radiance image and header
        rad_image, rad_wl, rad_meta = mpu.read_envi(radiance_image_header)

        # Convert irradiance spectrum string to numeric array
        if "solar irradiance" in rad_meta:
            irrad_spec = mpu.header_string_to_array(rad_meta["solar irradiance"])
        else:
            raise KeyError("Field 'solar irradiance' missing from radiance header")
        assert len(irrad_spec) == len(rad_wl)

        # Limit output wavelength range
        valid_image_wl_ind = (rad_wl >= self.wl_min) & (rad_wl <= self.wl_max)
        rad_wl = rad_wl[valid_image_wl_ind]
        rad_image = rad_image[:, :, valid_image_wl_ind]
        irrad_spec = irrad_spec[valid_image_wl_ind]

        # Make irradiance spectrum compatible with image
        irrad_spec = mpu.irrad_si_um_to_uflicklike(irrad_spec)  # Convert W/(m2*um) to uW/(cm2*um)
        irrad_spec = np.expand_dims(irrad_spec, axis=(0, 1))

        # Convert to reflectance, assuming Lambertian (perfectly diffuse) surface
        refl_image = np.pi * (rad_image.astype(np.float32) / irrad_spec.astype(np.float32))

        # Spectral smoothing (optional)
        if self.smooth_spectra:
            refl_image = mpu.savitzky_golay_filter(refl_image)

        # Update header wavelength info
        wl_str = mpu.array_to_header_string(rad_wl)
        refl_meta = rad_meta
        refl_meta["wavelength"] = wl_str
        mpu.save_envi(reflectance_image_header, refl_image, refl_meta)

__init__(wl_min=None, wl_max=None, conv_irrad_with_gauss=True, fwhm_irrad_smooth=3.5, smooth_spectra=False, refl_from_mean_irrad=False, irrad_spec_paths=None)

Parameters:

Name Type Description Default
wl_min Optional[float]

Minimum wavelength (nm) to include in reflectance image. If None, -float("inf") is used, and no lower limit is applied.

None
wl_max Optional[float]

Maximum wavelength (nm) to include in reflectance image. If None, float("inf") is used, and no upper limit is applied.

None
conv_irrad_with_gauss Optional[bool]

Indicate if irradiance spectrum should be smoothed with Gaussian kernel. This may be useful if irradiance is measured with a higher spectral resolution than radiance, and thus has sharper "spikes".

True
fwhm_irrad_smooth Optional[float]

Full-width-half-maximum for Gaussian smoothing kernel, in nanometers. Only used if conv_irrad_with_gauss==True

3.5
smooth_spectra Optional[bool]

Whether to smooth the reflectance spectra using a Savitzky-Golay filter

False
refl_from_mean_irrad bool

If True, the mean irradiance for the whole dataset is used for calculating reflectance, rather than the irradiance for a single image. This can be useful if individual irradiance are compromised and using the mean is more "robust". Paths to the irradiance files (irrad_spec_paths) must be specified.

False
irrad_spec_paths Optional[Iterable[Union[Path, str]]]

List of paths to irradiance spectra for the dataset. If specified (not None), a mean irradiance value is caluculated based on the spectra, and this irradiance value is used for every reflectance conversion, rather than the

None
Notes

The signal-to-noise ratio of both radiance images and irradiance spectra is generally lower at the low and high ends. When radiance is divided by noisy irradiance values close to zero, the noise can "blow up". Limiting the wavelength range can ensure that the reflectance images have more well-behaved values.

Source code in massipipe/reflectance.py
def __init__(
    self,
    wl_min: Optional[float] = None,
    wl_max: Optional[float] = None,
    conv_irrad_with_gauss: Optional[bool] = True,
    fwhm_irrad_smooth: Optional[float] = 3.5,
    smooth_spectra: Optional[bool] = False,
    refl_from_mean_irrad: bool = False,
    irrad_spec_paths: Optional[Iterable[Union[Path, str]]] = None,
) -> None:
    """Initialize reflectance converter

    Parameters
    ----------
    wl_min : Optional[float], default None
        Minimum wavelength (nm) to include in reflectance image.
        If None, -float("inf") is used, and no lower limit is applied.
    wl_max : Optional[float], default None
        Maximum wavelength (nm) to include in reflectance image.
        If None, float("inf") is used, and no upper limit is applied.
    conv_irrad_with_gauss: bool, default True
        Indicate if irradiance spectrum should be smoothed with Gaussian kernel.
        This may be useful if irradiance is measured with a higher spectral
        resolution than radiance, and thus has sharper "spikes".
    fwhm_irrad_smooth: float, default 3.5
        Full-width-half-maximum for Gaussian smoothing kernel, in nanometers.
        Only used if conv_irrad_with_gauss==True
    smooth_spectra: bool, default False
        Whether to smooth the reflectance spectra using a Savitzky-Golay filter
    refl_from_mean_irrad: bool, default False
        If True, the mean irradiance for the whole dataset is used for
        calculating reflectance, rather than the irradiance for a single image.
        This can be useful if individual irradiance are compromised and
        using the mean is more "robust". Paths to the irradiance files
        (irrad_spec_paths) must be specified.
    irrad_spec_paths : Optional[Iterable[Union[Path, str]]], default None
        List of paths to irradiance spectra for the dataset.
        If specified (not None), a mean irradiance value is caluculated based
        on the spectra, and this irradiance value is used for every reflectance
        conversion, rather than the

    Notes
    ------
    The signal-to-noise ratio of both radiance images and irradiance
    spectra is generally lower at the low and high ends. When
    radiance is divided by noisy irradiance values close to zero, the
    noise can "blow up". Limiting the wavelength range can ensure
    that the reflectance images have more well-behaved values.
    """

    self.wl_min = wl_min if wl_min is not None else -float("inf")
    self.wl_max = wl_max if wl_max is not None else float("inf")
    self.conv_irrad_with_gauss = (
        True if conv_irrad_with_gauss or (conv_irrad_with_gauss is None) else False
    )
    self.fwhm_irrad_smooth = fwhm_irrad_smooth if fwhm_irrad_smooth is not None else 3.5
    self.smooth_spectra = bool(smooth_spectra)

    if refl_from_mean_irrad and irrad_spec_paths is not None:
        self.refl_from_mean_irrad = True
        irrad_spec_mean, irrad_wl, _ = self._get_mean_irrad_spec(irrad_spec_paths)
    else:
        self.refl_from_mean_irrad = False
        irrad_spec_mean, irrad_wl = np.array([]), np.array([])
    self.ref_irrad_spec_mean = irrad_spec_mean
    self.ref_irrad_spec_wl = irrad_wl

_get_mean_irrad_spec(irrad_spec_paths) staticmethod

Read irradiance spectra from file and calculate mean

Source code in massipipe/reflectance.py
@staticmethod
def _get_mean_irrad_spec(
    irrad_spec_paths: Iterable[Union[Path, str]],
) -> Tuple[NDArray, NDArray, NDArray]:
    """Read irradiance spectra from file and calculate mean"""
    irrad_spectra = []
    for irrad_spec_path in irrad_spec_paths:
        if Path(irrad_spec_path).exists():
            irrad_spec, irrad_wl, _ = mpu.read_envi(irrad_spec_path)
            irrad_spectra.append(irrad_spec.squeeze())
    if not irrad_spectra:
        raise FileNotFoundError(f"No valid irradiance spectra found.")
    irrad_spectra = np.array(irrad_spectra)
    irrad_spec_mean = np.mean(irrad_spectra, axis=0)
    return irrad_spec_mean, irrad_wl, irrad_spectra

conv_spec_with_gaussian(spec, wl)

Convolve spectrum with Gaussian kernel to smooth / blur spectral details

Parameters:

Name Type Description Default
spec NDArray

Input spectrum, shape (n_bands,)

required
wl NDArray

Wavelengths corresponding to each spectrum value, shape (n_bands,)

required

Returns:

Name Type Description
spec_filtered NDArray

Filtered / smoothed version of spec, with same dimensions

Notes

When the kernel extends outside the data while filtering, edges are handled by repeating the nearest sampled value (edge value).

Source code in massipipe/reflectance.py
def conv_spec_with_gaussian(self, spec: NDArray, wl: NDArray) -> NDArray:
    """Convolve spectrum with Gaussian kernel to smooth / blur spectral details

    Parameters
    ----------
    spec: NDArray
        Input spectrum, shape (n_bands,)
    wl: NDArray, nanometers
        Wavelengths corresponding to each spectrum value, shape (n_bands,)

    Returns
    -------
    spec_filtered: NDArray
        Filtered / smoothed version of spec, with same dimensions

    Notes
    -----
    When the kernel extends outside the data while filtering, edges are handled
    by repeating the nearest sampled value (edge value).

    """
    sigma_wl = self.fwhm_irrad_smooth * 0.588705  # sigma = FWHM / 2*sqrt(2*ln(2))
    dwl = np.mean(np.diff(wl))  # Downwelling wavelength sampling dist.
    sigma_pix = sigma_wl / dwl  # Sigma in units of spectral samples
    spec_filtered = gaussian_filter1d(input=spec, sigma=sigma_pix, mode="nearest")
    return spec_filtered

_interpolate_irrad_to_image_wl(irrad_spec, irrad_wl, image_wl) staticmethod

Interpolate downwelling spectrum to image wavelengths

Source code in massipipe/reflectance.py
@staticmethod
def _interpolate_irrad_to_image_wl(
    irrad_spec: NDArray,
    irrad_wl: NDArray,
    image_wl: NDArray,
) -> NDArray:
    """Interpolate downwelling spectrum to image wavelengths"""
    return np.interp(x=image_wl, xp=irrad_wl, fp=irrad_spec)

convert_radiance_image_to_reflectance(rad_image, rad_wl, irrad_spec, irrad_wl)

Convert radiance image to reflectance using downwelling spectrum

Parameters:

Name Type Description Default
rad_image NDArray

Spectral radiance image in units of microflicks = 10e-5 W/(srm2nm) Shape (n_lines, n_samples, n_bands)

required
rad_wl NDArray

Wavelengths (in nanometers) corresponding to each band in rad_image

required
irrad_spec NDArray

Spectral irradiance in units of W/(m2*nm)

required
irrad_wl NDArray

Wavelengths (in nanometers) corresponding to each band in irrad_spec

required

Returns:

Name Type Description
refl_image NDArray

Reflectance image, unitless. Shape is same as rad_image for the first 2 axes, but may be less than that of rad_image along 3rd axis due to limiting of wavelength range.

refl_wl NDArray

Wavelengths for reflectance image

irrad_spec NDArray

Irradiance spectrum used for reflectance conversion. The spectrum has been interpolated to match the wavelengths of refl_image.

Source code in massipipe/reflectance.py
def convert_radiance_image_to_reflectance(
    self,
    rad_image: NDArray,
    rad_wl: NDArray,
    irrad_spec: NDArray,
    irrad_wl: NDArray,
) -> Tuple[NDArray, NDArray, NDArray]:
    """Convert radiance image to reflectance using downwelling spectrum

    Parameters
    ----------
    rad_image: NDArray
        Spectral radiance image in units of microflicks = 10e-5 W/(sr*m2*nm)
        Shape (n_lines, n_samples, n_bands)
    rad_wl: NDArray
        Wavelengths (in nanometers) corresponding to each band in rad_image
    irrad_spec: NDArray
        Spectral irradiance in units of W/(m2*nm)
    irrad_wl: NDArray
        Wavelengths (in nanometers) corresponding to each band in irrad_spec

    Returns
    -------
    refl_image: NDArray
        Reflectance image, unitless. Shape is same as rad_image for the
        first 2 axes, but may be less than that of rad_image along
        3rd axis due to limiting of wavelength range.
    refl_wl: NDArray
        Wavelengths for reflectance image
    irrad_spec:
        Irradiance spectrum used for reflectance conversion.
        The spectrum has been interpolated to match the wavelengths
        of refl_image.

    """

    # Check that spectrum is 1D, then expand to 3D for broadcasting
    irrad_spec = np.squeeze(irrad_spec)
    if irrad_spec.ndim != 1:
        raise ValueError("irrad_spec must be 1-dimensional")

    # Limit output wavelength range
    valid_image_wl_ind = (
        (rad_wl >= self.wl_min)
        & (rad_wl <= self.wl_max)
        & (rad_wl >= irrad_wl[0])
        & (rad_wl <= irrad_wl[-1])
    )
    rad_wl = rad_wl[valid_image_wl_ind]
    rad_image = rad_image[:, :, valid_image_wl_ind]

    # Make irradiance spectrum compatible with image
    irrad_spec = irrad_spec * 100_000  # Convert from W/(m2*nm) to uW/(cm2*um)
    if self.conv_irrad_with_gauss:
        irrad_spec = self.conv_spec_with_gaussian(irrad_spec, irrad_wl)
    irrad_spec = self._interpolate_irrad_to_image_wl(irrad_spec, irrad_wl, rad_wl)
    irrad_spec = np.expand_dims(irrad_spec, axis=(0, 1))

    # Set zero-valued irradiance elements to small value to avoid divide-by-zero
    irrad_spec[irrad_spec < 1.0] = 1.0  # <= 1 uW/(cm2*um)

    # Convert to reflectance, assuming Lambertian (perfectly diffuse) surface
    refl_image = np.pi * (rad_image.astype(np.float32) / irrad_spec.astype(np.float32))
    refl_wl = rad_wl

    # Spectral smoothing (optional)
    if self.smooth_spectra:
        refl_image = mpu.savitzky_golay_filter(refl_image)

    return refl_image, refl_wl, irrad_spec

convert_radiance_file_to_reflectance(radiance_image_header, irradiance_header, reflectance_image_header)

Read radiance image from file, convert to reflectance and save

Parameters:

Name Type Description Default
radiance_image_header Union[Path, str]

Path to header file for radiance image.

required
irradiance_header Union[Path, str]

Path to ENVI file containing irradiance measurement corresponding to radiance image file. Not used if self.refl_from_mean_irrad is True - in this case, it can be set to None.

required
reflectance_image_header Union[Path, str]

Path to header file for (output) reflectance image. Binary file will be saved with same name, except .hdr extension.

required
Source code in massipipe/reflectance.py
def convert_radiance_file_to_reflectance(
    self,
    radiance_image_header: Union[Path, str],
    irradiance_header: Union[Path, str],
    reflectance_image_header: Union[Path, str],
) -> None:
    """Read radiance image from file, convert to reflectance and save

    Parameters
    ----------
    radiance_image_header: Union[Path, str]
        Path to header file for radiance image.
    irradiance_header: Union[Path, str]
        Path to ENVI file containing irradiance measurement
        corresponding to radiance image file.
        Not used if self.refl_from_mean_irrad is True - in this
        case, it can be set to None.
    reflectance_image_header: Union[Path, str]
        Path to header file for (output) reflectance image.
        Binary file will be saved with same name, except .hdr extension.
    """
    rad_image, rad_wl, rad_meta = mpu.read_envi(radiance_image_header)
    if self.refl_from_mean_irrad:
        irrad_spec = self.ref_irrad_spec_mean
        irrad_wl = self.ref_irrad_spec_wl
    else:
        irrad_spec, irrad_wl, _ = mpu.read_envi(irradiance_header)

    refl_im, refl_wl, _ = self.convert_radiance_image_to_reflectance(
        rad_image, rad_wl, irrad_spec, irrad_wl
    )

    wl_str = mpu.array_to_header_string(refl_wl)
    refl_meta = rad_meta
    refl_meta["wavelength"] = wl_str
    mpu.save_envi(reflectance_image_header, refl_im, refl_meta)

add_irradiance_spectrum_to_header(radiance_image_header, irradiance_header)

Add irradiance spectrum to radiance image header

Parameters:

Name Type Description Default
radiance_image_header Union[Path, str]

Path to radiance image

required
irradiance_header Union[Path, str, None]

Path to irradiance spectrum header. If refl_from_mean_irrad == True for ReflectanceConverter, the calculated mean spectrum is used, and irradiance_header can be set to None.

required
Source code in massipipe/reflectance.py
def add_irradiance_spectrum_to_header(
    self,
    radiance_image_header: Union[Path, str],
    irradiance_header: Optional[Union[Path, str]],
) -> None:
    """Add irradiance spectrum to radiance image header

    Parameters
    ----------
    radiance_image_header : Union[Path, str]
        Path to radiance image
    irradiance_header : Union[Path, str, None]
        Path to irradiance spectrum header.
        If refl_from_mean_irrad == True for ReflectanceConverter,
        the calculated mean spectrum is used, and irradiance_header can be set to None.
    """
    if self.refl_from_mean_irrad:
        irrad_spec = self.ref_irrad_spec_mean
        irrad_wl = self.ref_irrad_spec_wl
    elif irradiance_header is not None:
        irrad_spec, irrad_wl, _ = mpu.read_envi(irradiance_header)
        irrad_spec = np.squeeze(irrad_spec)  # Convert to 1D array
    else:
        raise ValueError("Must specify irradiance spectrum file if not using mean irradiance.")

    _, rad_wl = mpu.read_envi_header(radiance_image_header)
    irrad_spec_interp = self._interpolate_irrad_to_image_wl(irrad_spec, irrad_wl, rad_wl)
    irrad_spec_interp = mpu.irrad_si_nm_to_si_um(irrad_spec_interp)  # Convert to W/(m2*um)
    mpu.add_header_irradiance(irrad_spec_interp, radiance_image_header)

convert_radiance_file_with_irradiance_to_reflectance(radiance_image_header, reflectance_image_header)

Convert radiance image with irradiance spectrum in header to reflectance

The irradiance information is read from the field "solar irradiance" in the header of the radiance image file. The irradiance is assumed to be in units W/(m2*um), which is standard for ENVI files, and the irradiance values are assumed to match the wavelengths of the image.

Parameters:

Name Type Description Default
radiance_image_header Union[Path, str]

Path to header file for radiance image.

required
reflectance_image_header Union[Path, str]

Path to header file for (output) reflectance image. Binary file will be saved with same name, except .hdr extension.

required
Source code in massipipe/reflectance.py
def convert_radiance_file_with_irradiance_to_reflectance(
    self,
    radiance_image_header: Union[Path, str],
    reflectance_image_header: Union[Path, str],
) -> None:
    """Convert radiance image with irradiance spectrum in header to reflectance

    The irradiance information is read from the field "solar irradiance" in the
    header of the radiance image file. The irradiance is assumed to be in units
    W/(m2*um), which is standard for ENVI files, and the irradiance values are
    assumed to match the wavelengths of the image.

    Parameters
    ----------
    radiance_image_header: Union[Path, str]
        Path to header file for radiance image.
    reflectance_image_header: Union[Path, str]
        Path to header file for (output) reflectance image.
        Binary file will be saved with same name, except .hdr extension.

    """
    # Read radiance image and header
    rad_image, rad_wl, rad_meta = mpu.read_envi(radiance_image_header)

    # Convert irradiance spectrum string to numeric array
    if "solar irradiance" in rad_meta:
        irrad_spec = mpu.header_string_to_array(rad_meta["solar irradiance"])
    else:
        raise KeyError("Field 'solar irradiance' missing from radiance header")
    assert len(irrad_spec) == len(rad_wl)

    # Limit output wavelength range
    valid_image_wl_ind = (rad_wl >= self.wl_min) & (rad_wl <= self.wl_max)
    rad_wl = rad_wl[valid_image_wl_ind]
    rad_image = rad_image[:, :, valid_image_wl_ind]
    irrad_spec = irrad_spec[valid_image_wl_ind]

    # Make irradiance spectrum compatible with image
    irrad_spec = mpu.irrad_si_um_to_uflicklike(irrad_spec)  # Convert W/(m2*um) to uW/(cm2*um)
    irrad_spec = np.expand_dims(irrad_spec, axis=(0, 1))

    # Convert to reflectance, assuming Lambertian (perfectly diffuse) surface
    refl_image = np.pi * (rad_image.astype(np.float32) / irrad_spec.astype(np.float32))

    # Spectral smoothing (optional)
    if self.smooth_spectra:
        refl_image = mpu.savitzky_golay_filter(refl_image)

    # Update header wavelength info
    wl_str = mpu.array_to_header_string(rad_wl)
    refl_meta = rad_meta
    refl_meta["wavelength"] = wl_str
    mpu.save_envi(reflectance_image_header, refl_image, refl_meta)

read_config(yaml_path)

Parse YAML config file, accepting only basic YAML tags

Source code in massipipe/config.py
def read_config(yaml_path: Union[Path, str]) -> Any:
    """Parse YAML config file, accepting only basic YAML tags"""
    with open(yaml_path, "r") as stream:
        config = yaml.safe_load(stream)
    return config

export_dataset_zip(dataset_dir, quicklook_dir, radiance_dir, imudata_dir, mosaic_visualization_dir, config_file_path)

Export selected parts of dataset to a zip file.

Parameters:

Name Type Description Default
dataset_dir Path

The directory containing the dataset to be exported.

required
quicklook_dir Path

The directory containing quicklook images.

required
radiance_dir Path

The directory containing radiance data.

required
imudata_dir Path

The directory containing IMU data.

required
mosaic_visualization_dir Path

The directory containing mosaic visualizations.

required
config_file_path Path

The path to the configuration file.

required

Returns:

Type Description
Path

The path to the created zip file.

Notes

This function creates a zip archive containing the specified parts of the dataset, including quicklook images, radiance data, IMU data, mosaic visualizations, and configuration file. It also includes a README and LICENSE file generated for the dataset.

Source code in massipipe/export.py
def export_dataset_zip(
    dataset_dir: Path,
    quicklook_dir: Path,
    radiance_dir: Path,
    imudata_dir: Path,
    mosaic_visualization_dir: Path,
    config_file_path: Path,
):
    """Export selected parts of dataset to a zip file.

    Parameters
    ----------
    dataset_dir : Path
        The directory containing the dataset to be exported.
    quicklook_dir : Path
        The directory containing quicklook images.
    radiance_dir : Path
        The directory containing radiance data.
    imudata_dir : Path
        The directory containing IMU data.
    mosaic_visualization_dir : Path
        The directory containing mosaic visualizations.
    config_file_path : Path
        The path to the configuration file.

    Returns
    -------
    Path
        The path to the created zip file.

    Notes
    -----
    This function creates a zip archive containing the specified parts of the dataset,
    including quicklook images, radiance data, IMU data, mosaic visualizations, and
    configuration file. It also includes a README and LICENSE file generated for the dataset.
    """

    logger.info("---- EXPORTING DATASET TO ZIP ARCHIVE ----")

    readme_file_path = create_readme_file(dataset_dir)
    license_file_path = create_license_file(dataset_dir)

    # Create zip file export paths
    zip_dir = dataset_dir / "processed"  # Standard folder for SeaBee processed files
    zip_file_path = zip_dir / (dataset_dir.name + ".zip")

    zip_dir.mkdir(exist_ok=True)

    with zipfile.ZipFile(zip_file_path, mode="w") as archive:
        _add_element_to_archive(dataset_dir, archive, quicklook_dir)
        _add_element_to_archive(dataset_dir, archive, radiance_dir)
        _add_element_to_archive(dataset_dir, archive, imudata_dir)
        _add_element_to_archive(dataset_dir, archive, mosaic_visualization_dir)
        _add_element_to_archive(dataset_dir, archive, config_file_path)
        _add_element_to_archive(dataset_dir, archive, readme_file_path)
        _add_element_to_archive(dataset_dir, archive, license_file_path)

    logger.info(f"Dataset exported to {zip_file_path}")

    return zip_file_path

georeferenced_hyspec_to_rgb_geotiff(hyspec_path, geotiff_path, rgb_wl=None, nodata_value=0.0)

Extract RGB bands from georeferenced hyperspectral image and save as GeoTIFF

Parameters:

Name Type Description Default
hyspec_path Union[Path, str]

Path to hyperspectral image header. Assumes that path to binary image is the same, but without .hdr suffix

required
geotiff_path Union[Path, str]

Path to (output) GeoTIFF

required
rgb_wl tuple[float, float, float]

Target wavelengths for RGB bands (closest available bands will be used)

None
Notes

A nodata value of zero is assumed for the hyperspectral image.

Source code in massipipe/georeferencing.py
def georeferenced_hyspec_to_rgb_geotiff(
    hyspec_path: Union[Path, str],
    geotiff_path: Union[Path, str],
    rgb_wl: Optional[Tuple[float, float, float]] = None,
    nodata_value: float = 0.0,
) -> None:
    """Extract RGB bands from georeferenced hyperspectral image and save as GeoTIFF

    Parameters
    ----------
    hyspec_path : Union[Path, str]
        Path to hyperspectral image header. Assumes that path to binary image is the
        same, but without .hdr suffix
    geotiff_path : Union[Path, str]
        Path to (output) GeoTIFF
    rgb_wl : tuple[float, float, float]
        Target wavelengths for RGB bands (closest available bands will be used)

    Notes
    -----
    A nodata value of zero is assumed for the hyperspectral image.
    """

    rgb_wl = (460, 550, 640) if rgb_wl is None else rgb_wl  # Default wavelengths

    hyspec_path = Path(hyspec_path)

    _, wl = mpu.read_envi_header(hyspec_path)
    wl_ind = [mpu.closest_wl_index(wl, target_wl) for target_wl in rgb_wl]
    band_names = [f"{actual_wl:.3f}" for actual_wl in wl[wl_ind]]
    with rasterio.Env():
        with rasterio.open(hyspec_path.with_suffix("")) as src:  # path to binary file
            # Read the selected bands
            bands_data = [src.read(band) for band in wl_ind]

            # Modify profile for the output file
            profile = src.meta.copy()
            profile.update(
                {
                    "count": len(wl_ind),
                    "driver": "GTiff",
                    "dtype": bands_data[0].dtype,
                    "nodata": nodata_value,
                }
            )

            # Write the bands to a new GeoTIFF file
            with rasterio.open(geotiff_path, "w", **profile) as dst:
                for i, (band_data, band_name) in enumerate(zip(bands_data, band_names), start=1):
                    dst.write(band_data, i)
                    dst.set_band_description(i, band_name)

add_geotiff_overviews(image_path, overview_factors=(2, 4, 8, 16, 32))

Add lower-resolution overviews to a GeoTIFF image file using Rasterio.

Parameters:

Name Type Description Default
image_path Path

Path to the GeoTIFF image to which overviews will be added in-place.

required
overview_factors Sequence[int]

Downsampling factors for the overviews (default: (2, 4, 8, 16, 32)). Typically factors of 2 are used (2, 4, 8, etc.).

(2, 4, 8, 16, 32)

Raises:

Type Description
Exception

If an error occurs while building the overviews.

Source code in massipipe/mosaic.py
def add_geotiff_overviews(
    image_path: Path, overview_factors: Sequence[int] = (2, 4, 8, 16, 32)
) -> None:
    """
    Add lower-resolution overviews to a GeoTIFF image file using Rasterio.

    Parameters
    ----------
    image_path : Path
        Path to the GeoTIFF image to which overviews will be added in-place.
    overview_factors : Sequence[int], optional
        Downsampling factors for the overviews (default: (2, 4, 8, 16, 32)). Typically
        factors of 2 are used (2, 4, 8, etc.).

    Raises
    ------
    Exception
        If an error occurs while building the overviews.
    """
    logger.info(f"Adding overviews to mosaic {image_path.name}")
    try:
        with rasterio.open(image_path, "r+") as mosaic_dataset:
            mosaic_dataset.build_overviews(overview_factors)
    except Exception:
        logger.error(f"Error while adding overviews to mosaic {image_path.name}", exc_info=True)
        raise

convert_geotiff_to_8bit(input_image_path, output_image_path, lower_percentile=2, upper_percentile=98, require_positive=True)

Convert a GeoTIFF image to an 8-bit representation using percentile stretching.

Parameters:

Name Type Description Default
input_image_path Path

Path to the original GeoTIFF image.

required
output_image_path Path

Path to save the 8-bit output GeoTIFF image.

required
lower_percentile float

Lower percentile (default 2) to set the minimum scaling value.

2
upper_percentile float

Upper percentile (default 98) to set the maximum scaling value.

98
require_positive bool

If True, enforces a lower bound of zero on the scaling; defaults to True.

True

Raises:

Type Description
Exception

If an error occurs during the conversion process.

Source code in massipipe/mosaic.py
def convert_geotiff_to_8bit(
    input_image_path: Path,
    output_image_path: Path,
    lower_percentile: float = 2,
    upper_percentile: float = 98,
    require_positive: bool = True,
) -> None:
    """
    Convert a GeoTIFF image to an 8-bit representation using percentile stretching.

    Parameters
    ----------
    input_image_path : Path
        Path to the original GeoTIFF image.
    output_image_path : Path
        Path to save the 8-bit output GeoTIFF image.
    lower_percentile : float, optional
        Lower percentile (default 2) to set the minimum scaling value.
    upper_percentile : float, optional
        Upper percentile (default 98) to set the maximum scaling value.
    require_positive : bool, optional
        If True, enforces a lower bound of zero on the scaling; defaults to True.

    Raises
    ------
    Exception
        If an error occurs during the conversion process.
    """
    no_data_value_8bit = 255

    logger.info(f"Converting {input_image_path.name} to 8-bit geotiff")
    if output_image_path is None:
        output_image_path = input_image_path

    try:
        with rasterio.open(input_image_path) as input_dataset:
            input_image = input_dataset.read()
            output_image = np.zeros(input_image.shape, dtype=np.uint8)

            for band_index in range(input_dataset.count):
                input_band = input_image[band_index]
                nodata_mask = input_band == input_dataset.nodata

                # Determine input range to use in outut
                range_min, range_max = np.percentile(
                    input_band[~nodata_mask], (lower_percentile, upper_percentile)
                )
                if require_positive:
                    range_min = max(0, range_min)

                # Scale using linear interpolation from input to output range
                output_band = np.interp(
                    input_band, (range_min, range_max), (0, no_data_value_8bit - 1)
                )

                # Transfer nodata mask
                output_band[nodata_mask] = no_data_value_8bit

                # Set output band as slice
                output_image[band_index] = output_band

            # Copy metadata, changing only data type and nodata value
            output_profile = input_dataset.profile.copy()
            output_profile["dtype"] = output_image.dtype
            output_profile["nodata"] = no_data_value_8bit

        with rasterio.open(output_image_path, "w", **output_profile) as output_dataset:
            output_dataset.write(output_image)

    except Exception:
        logger.error(f"Error while converting {input_image_path.name} to 8-bit", exc_info=True)
        raise

mosaic_geotiffs(image_paths, mosaic_path)

Merge non-rotated GeoTIFFs into a single mosaic file, and generate overviews using Rasterio.

Parameters:

Name Type Description Default
image_paths Iterable[Path]

Iterable of paths to input GeoTIFF images. Images with rotated geotransforms are not supported.

required
mosaic_path Path

Path to the output mosaic GeoTIFF file.

required

Raises:

Type Description
Exception

If an error occurs during the merging process.

Source code in massipipe/mosaic.py
def mosaic_geotiffs(image_paths: Iterable[Path], mosaic_path: Path) -> None:
    """
    Merge non-rotated GeoTIFFs into a single mosaic file, and generate overviews using
    Rasterio.

    Parameters
    ----------
    image_paths : Iterable[Path]
        Iterable of paths to input GeoTIFF images. Images with rotated geotransforms are
        not supported.
    mosaic_path : Path
        Path to the output mosaic GeoTIFF file.

    Raises
    ------
    Exception
        If an error occurs during the merging process.
    """
    # Open images
    images_to_merge = []
    for image_path in image_paths:
        try:
            if image_path.exists():
                images_to_merge.append(rasterio.open(image_path, "r"))
        except IOError as e:
            logger.warning(f"Error while reading {image_path} - skipping. ")

    # Merge
    try:
        rasterio.merge.merge(images_to_merge, dst_path=mosaic_path)
    except Exception:
        logger.error("Error while mosaicing geotiffs", exc_info=True)
        raise
    finally:
        for opened_image in images_to_merge:
            opened_image.close()

find_datasets(base_dir, subdir_search_strings=['0_raw', '1a_radiance'])

Find dataset paths based on expected subdirectories in dataset

Parameters:

Name Type Description Default
base_dir Path

Filesystem starting point (searching tree below this point)

required
subdir_search_strings list[str]

List with names of subdirectories that are expected to be within a dataset directory. If any of the names match, the dataset directory is included.

["0_raw", "1a_radiance"]

Returns:

Type Description
dataset_dirs

List of dataset dirctories mathcing search criteria.

Source code in massipipe/pipeline.py
def find_datasets(
    base_dir: Union[Path, str], subdir_search_strings: list[str] = ["0_raw", "1a_radiance"]
) -> list[Path]:
    """Find dataset paths based on expected subdirectories in dataset

    Parameters
    ----------
    base_dir : Path
        Filesystem starting point (searching tree below this point)
    subdir_search_strings : list[str], default ["0_raw", "1a_radiance"]
        List with names of subdirectories that are expected to be within a dataset
        directory. If any of the names match, the dataset directory is included.

    Returns
    -------
    dataset_dirs
        List of dataset dirctories mathcing search criteria.
    """
    base_dir = Path(base_dir)
    dataset_dirs = set()  # Use set to avoid duplicates
    for subdir_search_str in subdir_search_strings:
        dataset_dirs.update(p.parent for p in base_dir.rglob(subdir_search_str))
    return sorted(dataset_dirs)

closest_wl_index(wl_array, target_wl)

Get index in sampled wavelength array closest to target wavelength.

Parameters:

Name Type Description Default
wl_array ArrayLike

Array of wavelengths.

required
target_wl Union[float, int]

Single target wavelength.

required

Returns:

Type Description
int

Index of element in wl_array which is closest to target_wl.

Examples:

>>> closest_wl_index([420, 450, 470], 468)
2
Source code in massipipe/utils.py
def closest_wl_index(wl_array: ArrayLike, target_wl: Union[float, int]) -> int:
    """Get index in sampled wavelength array closest to target wavelength.

    Parameters
    ----------
    wl_array : ArrayLike
        Array of wavelengths.
    target_wl : Union[float, int]
        Single target wavelength.

    Returns
    -------
    int
        Index of element in wl_array which is closest to target_wl.

    Examples
    --------
    >>> closest_wl_index([420, 450, 470], 468)
    2
    """
    return int(np.argmin(abs(np.array(wl_array) - target_wl)))

percentile_stretch_image(image, percentiles=(2, 98), saturation_value=2 ** 12 - 1)

Scale array values within percentile limits to range 0-255.

Parameters:

Name Type Description Default
image NDArray

Image, shape (n_lines, n_samples, n_bands).

required
percentiles tuple of float

Lower and upper percentiles for stretching.

(2, 98)
saturation_value int

Saturation value for the image.

2 ** 12 - 1

Returns:

Name Type Description
image_stretched NDArray, dtype=uint8

Image with same shape as input. Image intensity values are stretched so that the lower percentile corresponds to 0 and the higher percentile corresponds to 255 (maximum value for unsigned 8-bit integer, typical for PNG/JPG). Pixels for which one or more bands are saturated are set to zero.

Source code in massipipe/utils.py
def percentile_stretch_image(
    image: NDArray,
    percentiles: tuple[float, float] = (2, 98),
    saturation_value: int = 2**12 - 1,
) -> NDArray:
    """Scale array values within percentile limits to range 0-255.

    Parameters
    ----------
    image : NDArray
        Image, shape (n_lines, n_samples, n_bands).
    percentiles : tuple of float, optional
        Lower and upper percentiles for stretching.
    saturation_value : int, optional
        Saturation value for the image.

    Returns
    -------
    image_stretched : NDArray, dtype=uint8
        Image with same shape as input.
        Image intensity values are stretched so that the lower
        percentile corresponds to 0 and the higher percentile corresponds
        to 255 (maximum value for unsigned 8-bit integer, typical for PNG/JPG).
        Pixels for which one or more bands are saturated are set to zero.
    """
    assert image.ndim == 3
    saturated = np.any(image >= saturation_value, axis=2)
    image_stretched = np.zeros_like(image, dtype=np.float64)

    for band_index in range(image.shape[2]):
        image_band = image[:, :, band_index]
        p_low, p_high = np.percentile(image_band[~saturated], percentiles)
        image_band[image_band < p_low] = p_low
        image_band[image_band > p_high] = p_high
        p_range = p_high - p_low
        image_stretched[:, :, band_index] = (image_band - p_low) * (255 / p_range)

    image_stretched[saturated] = 0
    return image_stretched.astype(np.uint8)

read_envi(header_path, image_path=None, write_byte_order_if_missing=True)

Load image in ENVI format, including wavelength vector and other metadata.

Parameters:

Name Type Description Default
header_path Path or str

Path to ENVI file header.

required
image_path Path or str or None

Path to ENVI data file, useful if data file is not found automatically from header file name (see spectral.io.envi.open).

None
write_byte_order_if_missing bool

Flag to indicate if the string "byte order = 0" should be written to the header file in case of MissingEnviHeaderParameter error (byte order is required by the "spectral" library, but is missing in some Resonon ENVI files).

True

Returns:

Name Type Description
image NDArray

Image, shape (n_lines, n_samples, n_channels).

wl NDArray

Wavelength vector, shape (n_channels,). None if no wavelengths listed.

metadata dict

Image metadata (ENVI header content).

Source code in massipipe/utils.py
def read_envi(
    header_path: Union[Path, str],
    image_path: Union[Path, str, None] = None,
    write_byte_order_if_missing=True,
) -> tuple[NDArray, NDArray, dict]:
    """Load image in ENVI format, including wavelength vector and other metadata.

    Parameters
    ----------
    header_path : Path or str
        Path to ENVI file header.
    image_path : Path or str or None, optional
        Path to ENVI data file, useful if data file is not found
        automatically from header file name (see spectral.io.envi.open).
    write_byte_order_if_missing : bool, optional
        Flag to indicate if the string "byte order = 0" should be written
        to the header file in case of MissingEnviHeaderParameter error
        (byte order is required by the "spectral" library, but is missing
        in some Resonon ENVI files).

    Returns
    -------
    image : NDArray
        Image, shape (n_lines, n_samples, n_channels).
    wl : NDArray
        Wavelength vector, shape (n_channels,). None if no wavelengths listed.
    metadata : dict
        Image metadata (ENVI header content).
    """

    # Open image handle
    try:
        im_handle = spectral.io.envi.open(header_path, image_path)
    except spectral.io.envi.MissingEnviHeaderParameter as e:
        logging.debug(f"Header file has missing parameter: {header_path}")
        byte_order_missing_str = 'Mandatory parameter "byte order" missing from header file.'
        if str(e) == byte_order_missing_str and write_byte_order_if_missing:
            logging.debug('Writing "byte order = 0" to header file and retrying')
            try:
                with open(header_path, "a") as file:
                    file.write("byte order = 0\n")
            except OSError:
                logger.error(f"Error writing to header file {header_path}", exc_info=True)
                raise

            try:
                im_handle = spectral.io.envi.open(header_path, image_path)
            except Exception as e:
                logger.error(
                    f"Unsuccessful when reading modified header file {header_path}",
                    exc_info=True,
                )
                raise
        else:
            logger.error(f"Error reading ENVI header file {header_path}", exc_info=True)
            raise
    except Exception as e:
        logger.error(f"Error opening ENVI file {header_path}", exc_info=True)
        raise

    # Read wavelengths if listed in metadata, set as empty array if not
    # NOTE: Calibration files ("gain"/"offset") don't include wavelength information
    if "wavelength" in im_handle.metadata:
        wl = np.array([float(i) for i in im_handle.metadata["wavelength"]])
    else:
        wl = np.array([])

    # Read data from disk
    try:
        image = np.array(im_handle.load())  # type: ignore
    except Exception as e:
        logger.error(f"Error loading image data from {header_path}", exc_info=True)
        raise

    # Returns
    return (image, wl, im_handle.metadata)

read_envi_header(header_path)

Read ENVI header information and convert wavelengths to numeric array.

Parameters:

Name Type Description Default
header_path Union[Path, str]

Path to ENVI header.

required

Returns:

Name Type Description
metadata dict

All header information formatted as dict.

wl NDArray

If "wavelength" is present in header file, wavelengths are returned as numeric array. If not, an empty array is returned.

Source code in massipipe/utils.py
def read_envi_header(header_path: Union[Path, str]) -> tuple[dict, NDArray]:
    """Read ENVI header information and convert wavelengths to numeric array.

    Parameters
    ----------
    header_path : Union[Path, str]
        Path to ENVI header.

    Returns
    -------
    metadata : dict
        All header information formatted as dict.
    wl : NDArray
        If "wavelength" is present in header file, wavelengths are returned as numeric
        array. If not, an empty array is returned.
    """
    try:
        metadata = spectral.io.envi.read_envi_header(header_path)
    except Exception as e:
        logger.error(f"Error reading ENVI header file {header_path}", exc_info=True)
        raise

    if "wavelength" in metadata:
        wl = np.array([float(i) for i in metadata["wavelength"]])
    else:
        wl = np.array([])
    return metadata, wl

read_json(json_path)

Read data saved in JSON file.

Parameters:

Name Type Description Default
json_path Union[Path, str]

Path to JSON file.

required

Returns:

Type Description
dict

Data from JSON file.

Source code in massipipe/utils.py
def read_json(json_path: Union[Path, str]) -> dict:
    """Read data saved in JSON file.

    Parameters
    ----------
    json_path : Union[Path, str]
        Path to JSON file.

    Returns
    -------
    dict
        Data from JSON file.
    """
    try:
        with open(json_path, "r") as file:
            data = json.load(file)
    except Exception as e:
        logger.error(f"Error reading JSON file {json_path}", exc_info=True)
        raise
    return data

rgb_subset_from_hsi(hyspec_im, hyspec_wl, rgb_target_wl=(650, 550, 450))

Extract 3 bands from hyperspectral image representing red, green, blue.

Parameters:

Name Type Description Default
hyspec_im NDArray

Hyperspectral image, shape (n_lines, n_samples, n_bands).

required
hyspec_wl NDArray

Wavelengths for each band of hyperspectral image, in nm. Shape (n_bands,).

required
rgb_target_wl ArrayLike

Wavelengths (in nm) representing red, green and blue.

(650, 550, 450)

Returns:

Name Type Description
rgb_im NDArray

3-band image representing red, green and blue color (in that order).

rgb_wl NDArray

3-element vector with wavelengths (in nm) corresponding to each band of rgb_im. Values correspond to the wavelengths in hyspec_wl that are closest to rgb_target_wl.

Source code in massipipe/utils.py
def rgb_subset_from_hsi(
    hyspec_im: NDArray, hyspec_wl: NDArray, rgb_target_wl: ArrayLike = (650, 550, 450)
) -> tuple[NDArray, NDArray]:
    """Extract 3 bands from hyperspectral image representing red, green, blue.

    Parameters
    ----------
    hyspec_im : NDArray
        Hyperspectral image, shape (n_lines, n_samples, n_bands).
    hyspec_wl : NDArray
        Wavelengths for each band of hyperspectral image, in nm.
        Shape (n_bands,).
    rgb_target_wl : ArrayLike, optional
        Wavelengths (in nm) representing red, green and blue.

    Returns
    -------
    rgb_im : NDArray
        3-band image representing red, green and blue color (in that order).
    rgb_wl : NDArray
        3-element vector with wavelengths (in nm) corresponding to
        each band of rgb_im. Values correspond to the wavelengths in
        hyspec_wl that are closest to rgb_target_wl.
    """
    wl_ind = [closest_wl_index(hyspec_wl, wl) for wl in rgb_target_wl]  # type: ignore
    rgb_im = hyspec_im[:, :, wl_ind]
    rgb_wl = hyspec_wl[wl_ind]
    return rgb_im, rgb_wl

save_envi(header_path, image, metadata, **kwargs)

Save ENVI file with parameters compatible with Spectronon.

Parameters:

Name Type Description Default
header_path Union[Path, str]

Path to header file. Data file will be saved in the same location and with the same name, but without the '.hdr' extension.

required
image NDArray

Hyperspectral image, shape (n_lines, n_samples, n_bands).

required
metadata dict

Dict containing (updated) image metadata. See read_envi().

required
**kwargs dict

Additional keyword arguments passed to spectral.envi.save_image.

{}
Source code in massipipe/utils.py
def save_envi(header_path: Union[Path, str], image: NDArray, metadata: dict, **kwargs) -> None:
    """Save ENVI file with parameters compatible with Spectronon.

    Parameters
    ----------
    header_path : Union[Path, str]
        Path to header file.
        Data file will be saved in the same location and with
        the same name, but without the '.hdr' extension.
    image : NDArray
        Hyperspectral image, shape (n_lines, n_samples, n_bands).
    metadata : dict
        Dict containing (updated) image metadata.
        See read_envi().
    **kwargs : dict
        Additional keyword arguments passed to spectral.envi.save_image.
    """
    try:
        spectral.envi.save_image(
            header_path, image, metadata=metadata, force=True, ext=None, **kwargs
        )
    except Exception as e:
        logger.error(f"Error saving ENVI file {header_path}", exc_info=True)
        raise

save_png(rgb_image, png_path)

Save RGB image as PNG using rasterio / GDAL.

Parameters:

Name Type Description Default
rgb_image NDArray, dtype=uint8

Image, shape (n_lines, n_samples, n_bands=3). Image bands must be ordered as RGB (band index 0 = red, band index 1 = green, band index 2 = blue). The image must already be scaled to uint8 values 0-255.

required
png_path Union[Path, str]

Path to output PNG file.

required
Source code in massipipe/utils.py
def save_png(rgb_image: NDArray[np.uint8], png_path: Union[Path, str]) -> None:
    """Save RGB image as PNG using rasterio / GDAL.

    Parameters
    ----------
    rgb_image : NDArray, dtype=uint8
        Image, shape (n_lines, n_samples, n_bands=3).
        Image bands must be ordered as RGB (band index 0 = red,
        band index 1 = green, band index 2 = blue).
        The image must already be scaled to uint8 values 0-255.
    png_path : Union[Path, str]
        Path to output PNG file.
    """
    assert (rgb_image.ndim == 3) and (rgb_image.shape[2] == 3)
    try:
        with warnings.catch_warnings():  # Catch warnings for this block of code
            warnings.filterwarnings("ignore")  # Ignore warning about missing geotransform
            with rasterio.Env():
                with rasterio.open(
                    png_path,
                    "w",
                    driver="PNG",
                    height=rgb_image.shape[0],
                    width=rgb_image.shape[1],
                    count=3,
                    dtype="uint8",
                ) as dst:
                    dst.write(reshape_as_raster(rgb_image))
    except Exception as e:
        logger.error(f"Error saving PNG file {png_path}", exc_info=True)
        raise

savitzky_golay_filter(image, window_length=13, polyorder=3, axis=2)

Filter hyperspectral image using Savitzky-Golay filter.

Parameters:

Name Type Description Default
image NDArray

Image array, shape (n_lines, n_samples, n_bands).

required
window_length int

Length of "local" window within which a polynomial is fitted to the data.

13
polyorder int

Order of fitted polynomial.

3
axis int

Axis along which filtering is applied.

2

Returns:

Type Description
NDArray

Filtered version of image.

Source code in massipipe/utils.py
def savitzky_golay_filter(
    image: NDArray, window_length: int = 13, polyorder: int = 3, axis: int = 2
) -> NDArray:
    """Filter hyperspectral image using Savitzky-Golay filter.

    Parameters
    ----------
    image : NDArray
        Image array, shape (n_lines, n_samples, n_bands).
    window_length : int, optional
        Length of "local" window within which a polynomial is fitted
        to the data.
    polyorder : int, optional
        Order of fitted polynomial.
    axis : int, optional
        Axis along which filtering is applied.

    Returns
    -------
    NDArray
        Filtered version of image.
    """
    return savgol_filter(image, window_length=window_length, polyorder=polyorder, axis=axis)

write_envi_header(header_path, metadata)

Write metadata dictionary to ENVI header (simple wrapper).

Parameters:

Name Type Description Default
header_path Union[Path, str]

Path to ENVI header file.

required
metadata dict

Metadata dictionary to write to the header file.

required
Source code in massipipe/utils.py
def write_envi_header(header_path: Union[Path, str], metadata: dict):
    """Write metadata dictionary to ENVI header (simple wrapper).

    Parameters
    ----------
    header_path : Union[Path, str]
        Path to ENVI header file.
    metadata : dict
        Metadata dictionary to write to the header file.
    """
    try:
        spectral.io.envi.write_envi_header(header_path, metadata)
    except Exception as e:
        logger.error(f"Error writing ENVI header file {header_path}", exc_info=True)
        raise