碧海苍梧

V1

2023/01/07阅读:38主题:默认主题

基于Python的多时相数据合成

文中的示例代码及数据可关注公众号回复20230105下载,公众号二维码见文末。

1 多时相数据合成

由于云覆盖、季节积雪、传感器故障等多种因素的影响,导致从遥感数据中提取的地表参数存在空间分布上的数据缺失、时间序列上的不连续等问题,严重制约了地表参数在全球变化等诸多研究领域的应用。因此,需要对遥感时间序列数据进行合成、平滑和填补处理,以生成时空连续的地表参数产品。

多时相数据合成就是依据一定的标准,选择某一时间范围内的多景相互匹配的数据中质量最好的象元值,作为该时间范围内合成结果的像元值。

2 计算思路

2.1 根据时间尺度获取数据文件路径

多时相数据一般分别存储在多个文件中,如要对这些数据进行处理必须要知道这些文件的路径。而在某些情况下,还要对这些文件的路径按照时间顺序进行排序,如在进行趋势分析的时候。因此,多时相数据合成的第一步就是要按时间顺序排列所需文件的路径。而数据的时间尺度不同、存储规则不同,读取方法也同样不同,但其中的理念是类似的,具体可见一文搞懂Python的文件路径操作

2.2 检查数据的有效性

数据的有效性是指,对于需要合成的数据来说,各个数据的行列数、空间范围和坐标系必须一致。否则,不同数据的像元无法对齐,自然也就不能进行数据合成。

from osgeo import gdal

def check_data(paths):
    '''检查所有数据文件是否横列数全部相同,同时返回其行列数'''
    ds = gdal.Open(paths[0])
    RasterXSize, RasterYSize = ds.RasterXSize, ds.RasterYSize
    gt = ds.GetGeoTransform()
    proj = ds.GetProjection()
    for i in range(1, len(paths)):
        ds = gdal.Open(paths[i])
        assert RasterXSize == ds.RasterXSize
        assert RasterYSize == ds.RasterYSize
        assert gt == ds.GetGeoTransform()
        assert proj == ds.GetProjection()
    info = {
     'RasterXSize': RasterXSize, 
     'RasterYSize': RasterYSize, 
     'shape': (RasterXSize, RasterYSize), 
     'gt': gt, 
     'proj': proj
    }
    return info

2.3 将多个数据文件转为Numpy数组

分别读取不同的数据文件的特定波段并将其结果转为Numpy数组,同时将各个数组合并为一个三维数组。读取的函数应支持分块读取,以防内存爆炸。以月尺度数据合成为例,至少需要同时读取30个栅格数据文件,假设每个栅格数据文件的横列数为7200×3600,那么合并后的Numpy数组的形状为30×7200×3600;假设数据为32为浮点型,那么至少需要消耗2.9GiB的内存。若进行的是一年十二个月的数据合成,那么消耗的内存会是月尺度消耗的12倍,很容易触发MemoryError,因此分块读取数据是一个很好的选择。

import numpy as np
from osgeo import gdal

def rasters_to_array(paths, band_num, xoff=None, yoff=None, xsize=None, ysize=None):
    '''读取所有数据文件,并转换为Numpy数组输出'''
    for i in range(len(paths)):
        path = paths[i]
        # 读取数据
        ds = gdal.Open(path)
        band = ds.GetRasterBand(band_num)
        array = band.ReadAsArray(xoff = xoff, yoff = yoff, 
                                win_xsize = xsize, win_ysize = ysize)
        # 设置数据的NoData值
        nodata = band.GetNoDataValue()
        if isinstance(nodata, type(None)):
            array = np.ma.masked_array(array, np.isnan(array))
        else:
            array = np.ma.masked_array(array, (array == nodata) | np.isnan(array))
        # 拼接数组
        if 0 == i:
            out_array = array[np.newaxis, :, :]
        else:
            out_array = np.ma.concatenate((out_array, array[np.newaxis, :, :]), axis=0)
    return out_array

2.4 多进程分块计算

为避免内存爆炸的风险,需要分块读取数据文件,再分别对每一块进行数据合成。若一块一块的处理,那么运行速度会很慢,采用多进程算法会大大加快这一进程。但需要注意的是,若设置的进程个数与分块个数相同,那么同时读入内存的数据仍是整幅影像,因此分块个数最好要大于进程数。

import numpy as np
import multiprocessing

def main(paths, band_num, processes=1, split_num=1):
    '''计算月最大值,可将数据拆分成几个部分分别计算'''
    assert split_num>=processes
    data_info = check_data(paths)
    RasterXSize, RasterYSize = data_info['shape']
    # 从x方向拆分
    x_break_points = [0] + [int(RasterXSize*i/split_num) 
          for i in range(1, split_num)] + [RasterXSize]
    chunks = []
    for i in range(len(x_break_points)-1):
        xoff = x_break_points[i]
        xsize = x_break_points[i+1] - x_break_points[i]
        yoff, ysize = 0, RasterYSize
        chunks.append((paths, band_num, xoff, yoff, xsize, ysize))
    # 多进程分别处理不同的栅格数组,processes表示进程个数
    pool = multiprocessing.Pool(processes)
    # 各个进程处理后的栅格数组整合返回为一个列表
    out_arrays = pool.map(cal_fun, chunks)
    # np.ma.concatenate表示把分割的栅格数组按拆分的轴拼接为一个
    # 若cal_fun返回结果是三维的,则按第3个轴拼接,即np.ma.concatenate(out_arrays, 2)
    return np.ma.concatenate(out_arrays, axis=1)

2.5 数据合成算法

首先读取数据文件转为Numpy的掩码数组,其形状为时间范围×行数×列数,在进行数据合成时需要对齐第0维进行操作,下面介绍一下常用的数据合成算法。

2.5.1 植被指数的最大值合成法

植被指数最大值合成法以给定时间范围内植被指数的最大值作为遥感数据选择的准则,忽略该时间范围内的无效值。植被指数通常由遥感数据的红光和近红外波段反射率的线性或非线性组合运算得到,是表征地表植被覆盖、生长状况的一个简单有效的参数。

该方法简单,易实现,已广泛应用于全球植被盖度变化监测。在实际应用中,MVC方法往往倾向于选择远离星下点观测的数据,而且对覆盖某些植被类型的云的去除效果较差

import numpy as np

def cal_fun(all_args):
    '''数据合成算法'''
    paths, band_num, xoff, yoff, xsize, ysize = all_args
    # 将栅格数据文件转为数组
    array = rasters_to_array(paths, band_num, xoff, yoff, xsize, ysize)
    # 最大值合成
    array = np.ma.max(array, axis=0)
    return array

2.5.2 波段反射率的最小值合成方法

针对MVC合成方法存在的问题,国内外许多学者提出了波段反射率值最小的遥感数据选择准则。

  • 选择蓝光波段最小值
  • 选择红光波段最小值
def cal_fun(all_args):
    '''数据合成算法'''
    paths, band_num, xoff, yoff, xsize, ysize = all_args
    # 将栅格数据文件转为数组,band_num设置为红光或蓝光所在的波段
    array = rasters_to_array(paths, band_num, nodata, xoff, yoff, xsize, ysize)
    # 最小值合成
    array = np.ma.min(array, axis=0)
    return array

在红光波段,云的反射率明显高于植被的反射率,依据红光波段反射率最小值的准则,可以减小选择受云污染的像元值的概率。但由于红光波段云影区的反射率也较低,因此导致该方法的合成结果中保留了大量受云影影响的像元值。为了消除云影的影响,一些学者提出了近红外波段第三最小值、第三最暗值等合成方法。

def cal_fun(all_args):
    '''数据合成算法'''
    paths, band_num, xoff, yoff, xsize, ysize = all_args
    # 将栅格数据文件转为数组,band_num设置为红光或蓝光所在的波段
    array = rasters_to_array(paths, band_num, nodata, xoff, yoff, xsize, ysize)
    # 第三最小值合成法
    array = np.ma.sort(array, axis=0)
    array = array[2, :, :]
    return array

2.5.3 MannKendall趋势提取

趋势提取实质上也是多时相数据合成,只是合成结果与数据本身所要反映的事物无关,只和数据的趋势有关。pymannkendall提供了很多种趋势提取算法,详情可见GitHub - mmhs013/pyMannKendall: A python package for non parametric Mann Kendall family of trend tests.

import pymannkendall as mk

def mk_(x):
    if np.any(x.mask):
        return np.NAN
    result = mk.original_test(x.data)
    return result.slope

def cal_fun(all_args):
    '''数据合成算法'''
    paths, band_num, xoff, yoff, xsize, ysize = all_args
    # 将栅格数据文件转为数组
    array = rasters_to_array(paths, band_num, xoff, yoff, xsize, ysize)
    # 趋势提取
    array = np.ma.apply_along_axis(mk_, 0, array)
    return array

以上代码在合成时只返回了趋势值slope,事实上result = mk.original_test(x.data)这一行代码时已经计算了这一时间序列的趋势、显著性等一系列指标。若需要多个指标,可在mk_函数内返回多个值,但需要注意的是分块计算的合并各块的代码也要随之而变,如下面的代码所示:

# 同时计算趋势和显著性
def mk_(x):
    if np.any(x.mask):
        return np.NAN, np.NAN
    result = mk.original_test(x.data)
    return result.slope, result.p 

# cal_fun不用改变,返回结果由二维变成三维,即(H, W) -> (2, H, W)
# 2表示mk_返回的两个结果,H,W分别是数组的高和宽
def cal_fun(all_args):
    '''数据合成算法'''
    paths, band_num, xoff, yoff, xsize, ysize = all_args
    # 将栅格数据文件转为数组
    array = rasters_to_array(paths, band_num, xoff, yoff, xsize, ysize)
    # 趋势提取
    array = np.ma.apply_along_axis(mk_, 0, array)
    return array

# cal_fun返回的数组形状不再是二维,需要改变main函数中合并分开数组的操作
# 此时拆分数组的轴所在的维数发生变化,需要按第3个轴拼接,即np.ma.concatenate(out_arrays, axis=2)

2.6 将Numpy数组转为栅格文件存储

在数据合成之后,需要保存数据合成的结果。使用GDAL创建栅格文件首先需要明确行列数、仿射变换六参数、投影等信息,因此需要使用GDAL读取合成数据中的某一个数据作为模板,据此明确这些信息。

from osgeo import gdal

def array_to_raster(out_path, array, proj, gt, dtype, nodata=None):
    '''将Numpy数组转为栅格文件保存
    Args:
     out_path: 输出路径
     array: 需要转为栅格的Numpy数组
     proj: 投影信息,WKT文本
     gt: 仿射变换六参数
     dtype: 保存的栅格数据文件的数据类型'''

    driver = gdal.GetDriverByName('GTiff')
    dst_ds = driver.Create(out_path, array.shape[1], array.shape[0], 
                            1, dtype, options=["COMPRESS=LZW"])
    dst_ds.SetGeoTransform(gt)
    dst_ds.SetProjection(proj)
    band = dst_ds.GetRasterBand(1)
    band.WriteArray(array)
    if not isinstance(nodata, type(None)):
        band.SetNoDataValue(nodata)
    dst_ds = band = None

2.7 将Numpy的数据类型转为GDAL的数据类型

Numpy的数据类型与GDAL的数据类型之间的对应关系如下表所示,根据Numpy的数据类型获取GDAL的数据类型的函数如下所示,需要注意的是某些Numpy的数据类型没有对应的GDAL的数据类型,因此需要将其转为精度更高数据类型,如Numpy的int8需转为GDAL的GDT_Int16

Numpy GDAL 描述
int16 GDT_Int16 整数(-32768 to 32767)
int32 int32 整数(-2147483648 to 2147483647)
uint8 GDT_Byte 无符号整数(0 to 255)
uint16 GDT_UInt16 无符号整数(0 to 65535)
uint32 GDT_UInt32 无符号整数(0 to 4294967295)
float32 GDT_Float32 单精度浮点数:1 个符号位,8 个指数位,23 个尾数位
float64 GDT_Float64 双精度浮点数:1 个符号位,11 个指数位,52 个尾数位
complex64 GDT_CFloat32 复数,表示双 32 位浮点数(实数部分和虚数部分)
complex128 GDT_CFloat64 复数,表示双 64 位浮点数(实数部分和虚数部分)
def numpy_to_gdal(dtype):
    numpy_dtype = ["byte""uint8""uint16""uint32"
                    "uint64""int8""int16""int32"
                    "int64""float16""float32""float64"
                    "cint16""cint32""cfloat32""cfloat64"]
    gdal_dtype = [gdal.GDT_Byte, gdal.GDT_Byte, gdal.GDT_UInt16, gdal.GDT_UInt32, 
                gdal.GDT_Float32, gdal.GDT_Int16, gdal.GDT_Int16, gdal.GDT_Int32, 
                gdal.GDT_Float32, gdal.GDT_Float32, gdal.GDT_Float32, gdal.GDT_Float64, 
                gdal.GDT_CInt16, gdal.GDT_CInt32, gdal.GDT_CFloat32, gdal.GDT_CFloat64]
    return gdal_dtype[numpy_dtype.index(dtype.name)]

3 MannKendall趋势提取示例

3.1 数据示例

本文以文章01颗粒物浓度时空变化趋势(Mann–Kendall Test)中的数据为例,整体演示上述的多时相数据处理思路。数据目录如下,数据和代码都在名为基于Python的多时相数据合成的文件夹下,tif文件夹下是各年的数据,mk.py就是进行趋势提取的代码:

+-- 基于Python的多时相数据合成
|   +-- tif
|   |   +-- 2000.tif
|   |   +-- 2001.tif
... ... ... ... ... 
|   |   +-- 2017.tif
|   |   +-- 2018.tif
|   +-- mk.py
|

3.2 全部代码

import os
import numpy as np
from glob import glob
import multiprocessing
from osgeo import gdal
import pymannkendall as mk


def check_data(paths):
    '''检查所有数据文件是否横列数全部相同,同时返回其行列数'''
    ds = gdal.Open(paths[0])
    RasterXSize, RasterYSize = ds.RasterXSize, ds.RasterYSize
    gt = ds.GetGeoTransform()
    proj = ds.GetProjection()
    for i in range(1, len(paths)):
        ds = gdal.Open(paths[i])
        assert RasterXSize == ds.RasterXSize
        assert RasterYSize == ds.RasterYSize
        assert gt == ds.GetGeoTransform()
        assert proj == ds.GetProjection()
    info = {
     'RasterXSize': RasterXSize, 
     'RasterYSize': RasterYSize, 
     'shape': (RasterXSize, RasterYSize), 
     'gt': gt, 
     'proj': proj
    }
    return info

def rasters_to_array(paths, band_num, xoff=None, yoff=None, xsize=None, ysize=None):
    '''读取所有数据文件,并转换为Numpy数组输出'''
    for i in range(len(paths)):
        path = paths[i]
        # 读取数据
        ds = gdal.Open(path)
        band = ds.GetRasterBand(band_num)
        array = band.ReadAsArray(xoff = xoff, yoff = yoff, 
                                win_xsize = xsize, win_ysize = ysize)
        # 设置数据的NoData值
        nodata = band.GetNoDataValue()
        if isinstance(nodata, type(None)):
            array = np.ma.masked_array(array, np.isnan(array))
        else:
            array = np.ma.masked_array(array, (array == nodata) | np.isnan(array))
        # 拼接数组
        if 0 == i:
            out_array = array[np.newaxis, :, :]
        else:
            out_array = np.ma.concatenate((out_array, array[np.newaxis, :, :]), axis=0)
    return out_array

def numpy_to_gdal(dtype):
    numpy_dtype = ["byte""uint8""uint16""uint32"
                    "uint64""int8""int16""int32"
                    "int64""float16""float32""float64"
                    "cint16""cint32""cfloat32""cfloat64"]
    gdal_dtype = [gdal.GDT_Byte, gdal.GDT_Byte, gdal.GDT_UInt16, gdal.GDT_UInt32, 
                gdal.GDT_Float32, gdal.GDT_Int16, gdal.GDT_Int16, gdal.GDT_Int32, 
                gdal.GDT_Float32, gdal.GDT_Float32, gdal.GDT_Float32, gdal.GDT_Float64, 
                gdal.GDT_CInt16, gdal.GDT_CInt32, gdal.GDT_CFloat32, gdal.GDT_CFloat64]
    return gdal_dtype[numpy_dtype.index(dtype.name)]

def array_to_raster(out_path, array, proj, gt, dtype, nodata=None):
    '''将Numpy数组转为栅格文件保存
    Args:
     out_path: 输出路径
     array: 需要转为栅格的Numpy数组
     proj: 投影信息,WKT文本
     gt: 仿射变换六参数
     dtype: 保存的栅格数据文件的数据类型'''

    driver = gdal.GetDriverByName('GTiff')
    dst_ds = driver.Create(out_path, array.shape[1], array.shape[0], 
                            1, dtype, options=["COMPRESS=LZW"])
    dst_ds.SetGeoTransform(gt)
    dst_ds.SetProjection(proj)
    band = dst_ds.GetRasterBand(1)
    band.WriteArray(array)
    if not isinstance(nodata, type(None)):
        band.SetNoDataValue(nodata)
    dst_ds = band = None

# 同时计算趋势和显著性
def mk_(x):
    if np.any(x.mask):
        return np.NAN, np.NAN
    result = mk.original_test(x.data)
    return result.slope, result.p 

# cal_fun不用改变,返回结果由二维变成三维,即(H, W) -> (2, H, W)
# 2表示mk_返回的两个结果,H,W分别是数组的高和宽
def cal_fun(all_args):
    '''数据合成算法'''
    paths, band_num, xoff, yoff, xsize, ysize = all_args
    # 将栅格数据文件转为数组
    array = rasters_to_array(paths, band_num, xoff, yoff, xsize, ysize)
    # 趋势提取
    array = np.ma.apply_along_axis(mk_, 0, array)
    return array

def main(paths, band_num, processes=1, split_num=1):
    '''计算月最大值,可将数据拆分成几个部分分别计算'''
    assert split_num>=processes
    data_info = check_data(paths)
    RasterXSize, RasterYSize = data_info['shape']
    # 从x方向拆分
    x_break_points = [0] + [int(RasterXSize*i/split_num) 
          for i in range(1, split_num)] + [RasterXSize]
    chunks = []
    for i in range(len(x_break_points)-1):
        xoff = x_break_points[i]
        xsize = x_break_points[i+1] - x_break_points[i]
        yoff, ysize = 0, RasterYSize
        chunks.append((paths, band_num, xoff, yoff, xsize, ysize))
    # 多进程分别处理不同的栅格数组,processes表示进程个数
    pool = multiprocessing.Pool(processes)
    # 各个进程处理后的栅格数组整合返回为一个列表
    out_arrays = pool.map(cal_fun, chunks)
    # np.ma.concatenate表示把分割的栅格数组按拆分的轴拼接为一个
    # 若cal_fun返回结果是三维的,则按第3个轴拼接,即np.ma.concatenate(out_arrays, 2)
    return np.ma.concatenate(out_arrays, axis=2)


# 存储数据文件的文件夹
root_path = './tif'
# 输出结果文件夹
out_path = './result'
# 进程个数
processes = 8
# 拆分个数
split_num = 16

if __name__ == '__main__':
 # 读取需要处理的数据路径
 year_paths = glob(os.path.join(root_path, '*.tif'))
 # 检查数据有效性并获取数据的基本信息
 data_info = check_data(year_paths)
 # 分块计算
 mk_array = main(year_paths, 1, processes=processes, split_num=split_num)
 # 设置输出栅格的nodata值
 nodata = None
 if np.issubdtype(mk_array.dtype, np.floating):
  nodata = np.nan
 else:
  nodata = np.iinfo(mk_array.dtype).max
 mk_array = mk_array.filled(nodata)
 # 设置输出栅格的数据类型
 dtype = numpy_to_gdal(mk_array.dtype)
 slopes, pvalues = mk_array
 # 保存数组至栅格文件
 slopes_path = os.path.join(out_path, 'slopes.tif')
 array_to_raster(slopes_path, slopes, data_info['proj'],
     data_info['gt'], dtype, nodata)
    
 pvalues_path = os.path.join(out_path, 'pvalues.tif')
 array_to_raster(pvalues_path, pvalues, data_info['proj'], 
     data_info['gt'], dtype, nodata)

3.3 结果展示

学习更多Python & GIS的相关知识,请移步公众号GeodataAnalysis

分类:

后端

标签:

Python

作者介绍

碧海苍梧
V1