API Reference

Datasets

download_dem(gdf, resolution, crs='EPSG:4326')

Source code in streamkit/datasets/usgs_downloader.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def download_dem(gdf, resolution, crs="EPSG:4326"):
    boundary = gdf.union_all()

    # if boundary is MultiPolygon
    if isinstance(boundary, MultiPolygon):
        boundary = boundary.bounds

    try:
        dem = py3dep.static_3dep_dem(boundary, resolution=resolution, crs=gdf.crs)
    except:
        dem = retry_on_smaller(boundary)

    if isinstance(boundary, MultiPolygon):
        dem = dem.rio.clip([boundary], gdf.crs, drop=True, all_touched=True)

    if dem.rio.crs != crs:
        dem = dem.rio.reproject(crs, resampling=rasterio.enums.Resampling.bilinear)
    return dem

download_flowlines(gdf, layer='medium', linestring_only=True, crs='EPSG:4326')

Source code in streamkit/datasets/usgs_downloader.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def download_flowlines(gdf, layer="medium", linestring_only=True, crs="EPSG:4326"):
    if layer == "medium":
        nhd = NHD("flowline_mr")
        flowlines = _download_flowlines(gdf, nhd, linestring_only, crs)
    elif layer == "high":
        nhd_hr = NHDPlusHR("flowline")
        flowlines = _download_flowlines(gdf, nhd_hr, linestring_only, crs)

        # if non network flowlines exist, download and append them
        # need to use try except because some areas may not have non network flowlines
        try:
            nhd_hr_non_network = NHDPlusHR("non_network_flowline")
            non_network_flowlines = _download_flowlines(
                gdf, nhd_hr_non_network, linestring_only, crs
            )
            combined = pd.concat([flowlines, non_network_flowlines], ignore_index=True)
            flowlines = gpd.GeoDataFrame(combined, crs=crs)
        except:
            flowlines = flowlines
    else:
        raise ValueError("layer must be either 'medium' or 'high'")
    return flowlines

get_huc_data(hucid, nhd_layer='medium', crs='EPSG:4326', dem_resolution=10)

Download hydrological and topographic data for a given HUC ID.

Retrieves NHD flowlines, and digital elevation model (DEM) data for the specified Hydrologic Unit Code (HUC) area.

Parameters:
  • hucid (str) –

    The Hydrologic Unit Code identifier for the watershed area.

  • nhd_layer (str, default: 'medium' ) –

    Options: "medium" or "high". Defaults to "medium"

  • crs (str, default: 'EPSG:4326' ) –

    The coordinate reference system for the output data as an EPSG code or other CRS string. Defaults to "EPSG:4326" (WGS84).

  • dem_resolution (int, default: 10 ) –

    The spatial resolution of the DEM in meters. Defaults to 10.

Returns:
  • Tuple[GeoDataFrame, DataArray]

    (dem raster, flowlines GeoDataFrame)

Source code in streamkit/datasets/usgs_downloader.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def get_huc_data(
    hucid: str,
    nhd_layer: str = "medium",
    crs: str = "EPSG:4326",
    dem_resolution: int = 10,
) -> Tuple[gpd.GeoDataFrame, xr.DataArray]:
    """Download hydrological and topographic data for a given HUC ID.

    Retrieves NHD flowlines, and digital elevation model (DEM) data for the
    specified Hydrologic Unit Code (HUC) area.

    Args:
        hucid: The Hydrologic Unit Code identifier for the watershed area.
        nhd_layer: Options: "medium" or "high". Defaults to "medium"
        crs: The coordinate reference system for the output data as an EPSG code
            or other CRS string. Defaults to "EPSG:4326" (WGS84).
        dem_resolution: The spatial resolution of the DEM in meters. Defaults to 10.

    Returns:
        (dem raster, flowlines GeoDataFrame)

    """

    huc_bounds = download_huc_bounds(hucid)
    flowlines = download_flowlines(
        huc_bounds, layer=nhd_layer, linestring_only=True, crs=crs
    )
    dem = download_dem(huc_bounds, dem_resolution, crs)

    return (dem, flowlines)

load_sample_data()

Source code in streamkit/datasets/sample_data.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def load_sample_data():
    CACHE_DIR.mkdir(parents=True, exist_ok=True)
    dem_path = CACHE_DIR / "sample_dem.tif"
    flowlines_path = CACHE_DIR / "sample_flowlines.gpkg"

    if not dem_path.exists() or not flowlines_path.exists():
        print(f"Sample data not found in cache. Downloading to {CACHE_DIR}...")
        dem, flowlines = get_huc_data("1805000203", crs="EPSG:3310")
        dem.rio.to_raster(dem_path)
        flowlines.to_file(flowlines_path, driver="GPKG")
    else:
        print(f"Loading sample data from cache at {CACHE_DIR}...")

    dem = rioxarray.open_rasterio(dem_path).squeeze()
    flowlines = gpd.read_file(flowlines_path)
    return dem, flowlines

Network Extraction

heads_from_flow_accumulation(flow_acc, flow_dir, threshold_area)

Source code in streamkit/extraction/heads.py
10
11
12
13
14
15
16
17
18
19
20
21
22
def heads_from_flow_accumulation(flow_acc, flow_dir, threshold_area):
    num_cells = int(threshold_area / (np.abs(flow_acc.rio.resolution()[0]) ** 2))
    streams = flow_acc >= num_cells
    sources, _, _ = find_stream_nodes(streams, flow_dir)

    channel_heads = flow_acc.copy(data=np.zeros(flow_acc.shape, dtype=np.uint8))
    for row, col in sources:
        channel_heads.data[row, col] = 1
    channel_heads.data[flow_acc == flow_acc.rio.nodata] = 255

    channel_heads = channel_heads.rio.set_nodata(255)
    channel_heads.attrs["_FillValue"] = 255
    return channel_heads

heads_from_features(flowline_features, template_raster)

Source code in streamkit/extraction/heads.py
25
26
27
def heads_from_features(flowline_features, template_raster):
    channel_heads = find_channel_head_points(flowline_features)
    return rasterize_channel_heads(channel_heads, template_raster)

extract_channel_network(channel_heads, flow_dir)

Source code in streamkit/extraction/network.py
 9
10
11
12
def extract_channel_network(channel_heads, flow_dir):
    streams = trace_streams(channel_heads, flow_dir)
    labeled_streams = link_streams(streams, flow_dir)
    return labeled_streams

find_stream_nodes(stream_raster, flow_directions)

Identify source points and confluence points in a stream network Args: stream_raster: Raster representing the stream network (non-zero values indicate streams) flow_directions: Raster representing flow directions using ESRI convention Returns: Tuple containing lists of source points, confluence points, and outlet points

Source code in streamkit/extraction/nodes.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def find_stream_nodes(
    stream_raster: xr.DataArray, flow_directions: xr.DataArray
) -> tuple:
    """Identify source points and confluence points in a stream network
    Args:
        stream_raster: Raster representing the stream network (non-zero values indicate streams)
        flow_directions: Raster representing flow directions using ESRI convention
    Returns:
        Tuple containing lists of source points, confluence points, and outlet points
    """

    dirmap = _make_numba_esri_dirmap()
    sources, confluences, outlets = _find_stream_nodes_numba(
        stream_raster.data, flow_directions.data, dirmap
    )
    return sources, confluences, outlets

delineate_subbasins(stream_raster, flow_directions, flow_accumulation)

Delineate all subbasins given a channel network raster. Detects pour points for each unique stream segment by finding the cell with the highest flow accumulation for that segment (based on ID).

Parameters:
  • stream_raster (DataArray) –

    Raster of channel network with unique IDs for each segment

  • flow_directions (DataArray) –

    Flow directions raster (ESRI d8 encoding)

  • flow_accumulation (DataArray) –

    Flow accumulation raster

Returns: Raster of subbasins with same IDs as stream_raster

Source code in streamkit/extraction/subbasins.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def delineate_subbasins(
    stream_raster: xr.DataArray,
    flow_directions: xr.DataArray,
    flow_accumulation: xr.DataArray,
) -> xr.DataArray:
    """
    Delineate all subbasins given a channel network raster. Detects pour points
    for each unique stream segment by finding the cell with the highest flow
    accumulation for that segment (based on ID).

    Args:
        stream_raster: Raster of channel network with unique IDs for each
            segment
        flow_directions: Flow directions raster (ESRI d8 encoding)
        flow_accumulation: Flow accumulation raster
    Returns:
        Raster of subbasins with same IDs as stream_raster
    """
    # get pour points from channel network raster
    # these are sorted so that nested basins are handled correctly
    pour_points = _identify_pour_points(stream_raster, flow_accumulation)

    subbasins = stream_raster.copy(
        data=np.zeros_like(stream_raster.data, dtype=np.int32)
    )

    pysheds_fdir, grid = to_pysheds(flow_directions)

    for _, row in pour_points.iterrows():
        pour_row = row["row"]
        pour_col = row["col"]

        catchment = grid.catchment(
            x=pour_col,
            y=pour_row,
            fdir=pysheds_fdir,
            xytype="index",
        )

        subbasins.data[catchment] = row["stream_value"]

    return subbasins

delineate_reaches(stream_raster, dem, flow_dir, flow_acc, penalty=None, min_length=500, smooth_window=None, threshold_degrees=1.0)

Segment stream networks into reaches based on slope change points.

Uses the PELT (Pruned Exact Linear Time) changepoint detection algorithm to identify distinct reaches along each stream based on slope variations. Adjacent reaches with similar slopes can be merged using the threshold parameter.

Parameters:
  • stream_raster (DataArray) –

    Labeled stream network where each unique value represents a stream segment (0 for non-stream pixels).

  • dem (DataArray) –

    Digital elevation model for slope calculations.

  • flow_dir (DataArray) –

    Flow direction raster corresponding to the DEM.

  • flow_acc (DataArray) –

    Flow accumulation raster corresponding to the DEM.

  • penalty (float | None, default: None ) –

    PELT algorithm penalty parameter. If None, automatically calculated as log(n) * variance of slope signal.

  • min_length (float, default: 500 ) –

    Minimum reach length in meters.

  • smooth_window (int | None, default: None ) –

    Window size for smoothing slope values before segmentation. If None, no smoothing is applied.

  • threshold_degrees (float, default: 1.0 ) –

    Merge adjacent reaches if slope difference is below this threshold in degrees.

Returns:
  • DataArray

    A raster where each pixel value represents a unique reach ID (0 for non-stream pixels). Reach IDs are computed as reach_number + (stream_id * 1000).

Source code in streamkit/extraction/reaches.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def delineate_reaches(
    stream_raster: xr.DataArray,
    dem: xr.DataArray,
    flow_dir: xr.DataArray,
    flow_acc: xr.DataArray,
    penalty: float | None = None,
    min_length: float = 500,
    smooth_window: int | None = None,
    threshold_degrees: float = 1.0,
) -> xr.DataArray:
    """Segment stream networks into reaches based on slope change points.

    Uses the PELT (Pruned Exact Linear Time) changepoint detection algorithm
    to identify distinct reaches along each stream based on slope variations.
    Adjacent reaches with similar slopes can be merged using the threshold parameter.

    Args:
        stream_raster: Labeled stream network where each unique value represents
            a stream segment (0 for non-stream pixels).
        dem: Digital elevation model for slope calculations.
        flow_dir: Flow direction raster corresponding to the DEM.
        flow_acc: Flow accumulation raster corresponding to the DEM.
        penalty: PELT algorithm penalty parameter. If None, automatically calculated
            as log(n) * variance of slope signal.
        min_length: Minimum reach length in meters.
        smooth_window: Window size for smoothing slope values before segmentation.
            If None, no smoothing is applied.
        threshold_degrees: Merge adjacent reaches if slope difference is below this
            threshold in degrees.

    Returns:
        A raster where each pixel value represents a unique reach ID (0 for non-stream pixels). Reach IDs are computed as reach_number + (stream_id * 1000).
    """
    reaches = stream_raster.copy(data=np.zeros_like(stream_raster, dtype=np.uint32))
    for stream_val in np.unique(stream_raster):
        if stream_val == 0 or np.isnan(stream_val):
            continue
        else:
            stream_mask = stream_raster == stream_val

            if np.sum(stream_mask.data) == 1:  # single pixel stream
                reach_id = np.uint32(stream_val) * 1000
                reaches.data[stream_mask.data] = reach_id
                continue

            stream_df = _create_stream_points(stream_mask, flow_dir, flow_acc, dem)
            # roughly convert min_length in meters to number of points
            min_size = int(min_length / flow_dir.rio.resolution()[0])
            stream_df = _pelt_reaches(
                stream_df,
                penalty=penalty,
                min_size=min_size,
                smooth_window=smooth_window,
            )
            stream_df = _merge_reaches_by_threshold(
                stream_df, threshold_degrees=threshold_degrees
            )
            stream_df["reach_val"] = stream_df["reach_id"] + (
                np.uint32(stream_val) * 1000
            )
            rows, cols = stream_df["row"].values, stream_df["col"].values
            reaches.data[rows, cols] = stream_df["reach_val"].values
    return reaches

Network Analysis

vectorize_streams(stream_raster, flow_directions, flow_accumulation)

Vectorize streams from a raster to a GeoDataFrame of LineStrings. Args: stream_raster: A raster of stream segments with unique IDs. flow_directions: A raster of flow directions (ESRI D8 encoding). flow_accumulation: A raster of flow accumulation values. Returns: A GeoDataFrame with LineString geometries representing the streams with stream_id column (from the raster values).

Source code in streamkit/network/stream_to_vector.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def vectorize_streams(
    stream_raster: xr.DataArray,
    flow_directions: xr.DataArray,
    flow_accumulation: xr.DataArray,
) -> gpd.GeoDataFrame:
    """
    Vectorize streams from a raster to a GeoDataFrame of LineStrings.
    Args:
        stream_raster: A raster of stream segments with unique IDs.
        flow_directions: A raster of flow directions (ESRI D8 encoding).
        flow_accumulation: A raster of flow accumulation values.
    Returns:
        A GeoDataFrame with LineString geometries representing the streams with stream_id column (from the raster values).
    """
    flowlines = []
    for stream_id in np.unique(stream_raster.values):
        if stream_id == 0 or np.isnan(stream_id):
            continue

        stream_mask = stream_raster == stream_id
        flow_acc = flow_accumulation * stream_mask.data
        flow_dir = flow_directions * stream_mask.data

        line = _vectorize_single_stream(stream_mask, flow_dir, flow_acc)
        if line is not None:
            flowlines.append({"geometry": line, "stream_id": int(stream_id)})

    gdf = gpd.GeoDataFrame(flowlines, crs=stream_raster.rio.crs)
    return gdf

analyze_stream_network(stream_network, dem, flow_direction, flow_accumulation)

Source code in streamkit/network/analysis.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def analyze_stream_network(
    stream_network: xr.DataArray,
    dem: xr.DataArray,
    flow_direction: xr.DataArray,
    flow_accumulation: xr.DataArray,
) -> gpd.GeoDataFrame:
    vector_streams = vectorize_streams(
        stream_network, flow_direction, flow_accumulation
    )
    graph = vector_to_graph(vector_streams)

    graph = add_strahler_attribute(graph)
    graph = add_upstream_length_attribute(graph)
    graph = add_mainstem_attribute(graph)
    graph = add_mean_slope_attribute(graph, dem)

    analyzed_vector_streams = graph_to_vector(graph)
    analyzed_vector_streams["length"] = analyzed_vector_streams.geometry.length

    # contributing area
    analyzed_vector_streams["contributing_area"] = analyzed_vector_streams[
        "stream_id"
    ].apply(
        lambda sid: stream_contributing_area(sid, stream_network, flow_accumulation)
    )

    return analyzed_vector_streams

vector_to_graph(lines)

Source code in streamkit/network/graph_conversions.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def vector_to_graph(lines: gpd.GeoDataFrame) -> nx.DiGraph:
    G = nx.DiGraph()

    precision = 6 if lines.crs.is_geographic else 2

    for _, line in lines.iterrows():
        start = line.geometry.coords[0]
        end = line.geometry.coords[-1]

        start = (round(start[0], precision), round(start[1], precision))
        end = (round(end[0], precision), round(end[1], precision))

        G.add_edge(
            start,
            end,
            crs=lines.crs,
            geometry=line.geometry,
            **line.drop("geometry").to_dict(),
        )
    return G

graph_to_vector(G)

Source code in streamkit/network/graph_conversions.py
27
28
29
30
31
32
33
34
35
36
37
38
39
def graph_to_vector(G: nx.DiGraph) -> gpd.GeoDataFrame:
    edges = []
    for u, v, data in G.edges(data=True):
        edges.append(
            {
                "geometry": gpd.points_from_xy([u[0], v[0]], [u[1], v[1]]).unary_union,
                **data,
            }
        )
    gdf = gpd.GeoDataFrame(edges, geometry="geometry")
    gdf = gdf.set_crs(gdf["crs"].iloc[0])
    gdf = gdf.drop(columns=["crs"])
    return gdf

sample_cross_sections(xs_linestrings, point_interval)

Generate profile points along cross-section linestrings at regular intervals.

Creates evenly-spaced points along each cross-section line, measuring distances from the center point. Points are labeled as 'center', 'positive' (downstream of center), or 'negative' (upstream of center).

Parameters:
  • xs_linestrings (GeoDataFrame) –

    GeoDataFrame containing LineString geometries representing cross-sections. If 'xs_id' column is not present, sequential IDs will be automatically assigned.

  • point_interval (float) –

    Spacing between points along each cross-section, in the units of the GeoDataFrame's CRS.

Returns:
  • GeoDataFrame

    A GeoDataFrame with Point geometries containing columns: - xs_id: Cross-section identifier - side: Position relative to center ('center', 'positive', 'negative') - distance: Distance from center point (negative for upstream, positive for downstream) - geometry: Point geometry - Additional columns from input xs_linestrings are preserved

Source code in streamkit/network/cross_sections.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def sample_cross_sections(
    xs_linestrings: gpd.GeoDataFrame, point_interval: float
) -> gpd.GeoDataFrame:
    """Generate profile points along cross-section linestrings at regular intervals.

    Creates evenly-spaced points along each cross-section line, measuring distances
    from the center point. Points are labeled as 'center', 'positive' (downstream
    of center), or 'negative' (upstream of center).

    Args:
        xs_linestrings: GeoDataFrame containing LineString geometries representing
            cross-sections. If 'xs_id' column is not present, sequential IDs will
            be automatically assigned.
        point_interval: Spacing between points along each cross-section, in the
            units of the GeoDataFrame's CRS.

    Returns:
        A GeoDataFrame with Point geometries containing columns:
            - xs_id: Cross-section identifier
            - side: Position relative to center ('center', 'positive', 'negative')
            - distance: Distance from center point (negative for upstream, positive
              for downstream)
            - geometry: Point geometry
            - Additional columns from input xs_linestrings are preserved
    """

    if "xs_id" not in xs_linestrings.columns:
        xs_linestrings["xs_id"] = np.arange(1, len(xs_linestrings) + 1)

    xs_points = []
    for xs_id, xs_linestring in xs_linestrings.groupby("xs_id"):
        for _, linestring in xs_linestring.iterrows():
            points = _points_along_linestring(
                linestring.geometry, point_interval, crs=xs_linestrings.crs
            )
            points["xs_id"] = xs_id
            # add other columns from xs_linestrings
            for col in xs_linestring.columns:
                if col != "geometry":
                    points[col] = linestring[col]
            xs_points.append(points)

    return gpd.GeoDataFrame(
        pd.concat(xs_points), crs=xs_linestrings.crs, geometry="geometry"
    ).reset_index(drop=True)

network_cross_sections(linestrings, interval_distance, width, linestring_ids=None, smoothed=False)

Create cross-sections at regular intervals along linestrings.

Parameters:
  • linestrings (GeoSeries) –

    Linestring geometries.

  • interval_distance (float) –

    Distance between cross-sections along the linestrings.

  • width (float) –

    Width of each cross-section.

  • linestring_ids (Optional[Sequence], default: None ) –

    Optional identifiers for each linestring. If None, the index of linestrings is used.

  • smoothed (bool, default: False ) –

    Whether to use smoothed angles for cross-sections.

Returns: cross section linestrings

Source code in streamkit/network/cross_sections.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def network_cross_sections(
    linestrings: gpd.GeoSeries,
    interval_distance: float,
    width: float,
    linestring_ids: Optional[Sequence] = None,
    smoothed: bool = False,
):
    """
    Create cross-sections at regular intervals along linestrings.

    Args:
        linestrings: Linestring geometries.
        interval_distance: Distance between cross-sections along the linestrings.
        width: Width of each cross-section.
        linestring_ids: Optional identifiers for each linestring. If None, the index of linestrings is used.
        smoothed: Whether to use smoothed angles for cross-sections.
    Returns:
        cross section linestrings
    """
    if linestring_ids is None:
        linestring_ids = linestrings.index
    else:
        if len(linestring_ids) != len(linestrings):
            raise ValueError("provided ids must match the length of linestrings")

    xsections = []
    for cid, linestring in zip(linestring_ids, linestrings):
        channel_xsections = _create_cross_sections(
            linestring, interval_distance, width, crs=linestrings.crs, smoothed=smoothed
        )
        # convert to DataFrame and add segment_id
        channel_xsections = gpd.GeoDataFrame(
            geometry=channel_xsections, crs=linestrings.crs
        )
        channel_xsections["linestring_id"] = cid
        xsections.append(channel_xsections)
    xsections = pd.concat(xsections)
    xsections["xs_id"] = np.arange(1, len(xsections) + 1)
    return xsections

Terrain Analysis

condition_dem(dem, max_retries=3, wait_time=1.0)

Condition DEM using WhiteboxTools fill_depressions with retry logic.

Source code in streamkit/terrain/accumulation.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def condition_dem(dem, max_retries=3, wait_time=1.0):
    """Condition DEM using WhiteboxTools fill_depressions with retry logic."""
    for attempt in range(1, max_retries + 1):
        try:
            # Create fresh temp dir and WBT instance
            working_dir = tempfile.mkdtemp()
            wbt = whitebox.WhiteboxTools()
            wbt.set_working_dir(working_dir)
            wbt.verbose = False

            # Write DEM to disk
            dem_path = os.path.join(working_dir, "dem.tif")
            filled_path = os.path.join(working_dir, "filled_dem.tif")
            dem.rio.to_raster(dem_path)

            # Run fill depressions
            wbt.fill_depressions(dem_path, filled_path, fix_flats=True)

            # Check for output
            if not os.path.exists(filled_path) or os.path.getsize(filled_path) == 0:
                raise FileNotFoundError("WhiteboxTools failed to produce output")

            # Load result
            conditioned_dem = rxr.open_rasterio(filled_path, masked=True).squeeze()
            return conditioned_dem

        except Exception as e:
            print(f"[Attempt {attempt}/{max_retries}] fill_depressions failed: {e}")
            if attempt < max_retries:
                time.sleep(wait_time)
                continue
            else:
                raise RuntimeError(
                    f"condition_dem failed after {max_retries} attempts"
                ) from e

flow_accumulation_workflow(dem)

Given a DEM, compute the conditioned DEM, flow directions, and flow accumulation. Uses d8 flow directions, wraps around whiteboxtools 'fill depression with fix flats' algorithm. Flow direction and accumulation done with pysheds. Uses ESRI flow direction encoding.

Parameters:
  • dem (DataArray) –

    DEM raster

Returns: (conditioned DEM, flow directions, and flow accumulation)

Source code in streamkit/terrain/accumulation.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def flow_accumulation_workflow(
    dem: xr.DataArray,
) -> tuple[xr.DataArray, xr.DataArray, xr.DataArray]:
    """
    Given a DEM, compute the conditioned DEM, flow directions, and flow
    accumulation. Uses d8 flow directions, wraps around whiteboxtools 'fill
    depression with fix flats' algorithm. Flow direction and accumulation done
    with pysheds. Uses ESRI flow direction encoding.

    Args:
        dem: DEM raster
    Returns:
        (conditioned DEM, flow directions, and flow accumulation)

    """
    # wbt condition
    conditioned_dem = condition_dem(dem)
    pysheds_conditioned_dem, grid = to_pysheds(conditioned_dem)
    flow_directions = grid.flowdir(pysheds_conditioned_dem)
    flow_accumulation = grid.accumulation(flow_directions)

    cdem = from_pysheds(pysheds_conditioned_dem)
    flow_directions = from_pysheds(flow_directions)
    flow_directions = flow_directions.astype(np.int16)
    flow_accumulation = from_pysheds(flow_accumulation)
    return (
        cdem.rio.reproject_match(dem),
        flow_directions.rio.reproject_match(dem),
        flow_accumulation.rio.reproject_match(dem),
    )

create_cost_graph(dem, walls=None, enforce_uphill=False, enforce_downhill=False)

Source code in streamkit/terrain/cost.py
 6
 7
 8
 9
10
11
12
13
14
15
def create_cost_graph(dem, walls=None, enforce_uphill=False, enforce_downhill=False):
    data, ids = _create_graph_data_numba(
        dem.data,
        walls,
        enforce_uphill,
        enforce_downhill,
        cell_size=dem.rio.resolution()[0],
    )
    graph = csr_matrix(data, shape=(ids.size, ids.size))
    return graph

compute_hand(dem, flow_directions, streams)

Source code in streamkit/terrain/elevation.py
40
41
42
43
44
45
46
47
48
49
def compute_hand(dem, flow_directions, streams):
    # note ESRI d8 flow direction encoding
    pysheds_dem, grid = to_pysheds(dem)
    flow_directions, _ = to_pysheds(flow_directions)
    streams, _ = to_pysheds(streams)
    dirmap = (64, 128, 1, 2, 32, 16, 8, 4)
    hand = grid.compute_hand(flow_directions, pysheds_dem, streams > 0, dirmap=dirmap)
    hand = from_pysheds(hand)
    hand = hand.rio.reproject_match(dem)
    return hand

compute_hand_wbt(conditioned_dem, streams)

Source code in streamkit/terrain/elevation.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def compute_hand_wbt(conditioned_dem, streams):
    working_dir = tempfile.mkdtemp()
    wbt = whitebox.WhiteboxTools()
    wbt.set_working_dir(working_dir)
    wbt.verbose = False

    dem_path = os.path.join(working_dir, "conditioned_dem.tif")
    streams_path = os.path.join(working_dir, "streams.tif")
    hand_path = os.path.join(working_dir, "hand.tif")
    conditioned_dem.rio.to_raster(dem_path)
    streams.rio.to_raster(streams_path)

    wbt.elevation_above_stream(dem_path, streams_path, hand_path)

    if not os.path.exists(hand_path) or os.path.getsize(hand_path) == 0:
        raise FileNotFoundError("WhiteboxTools failed to produce HAND output")
    hand = rxr.open_rasterio(hand_path, masked=True).squeeze()
    hand = hand.rio.reproject_match(conditioned_dem)
    return hand

compute_rem(dem, stream_mask, k=5, fit_regression=False, dist_method='cost-distance')

Source code in streamkit/terrain/elevation.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def compute_rem(
    dem,
    stream_mask,
    k=5,
    fit_regression=False,
    dist_method="cost-distance",
):
    if dist_method == "euclidean":
        dists, inds = find_nearest_stream_pts_euclidean(dem, stream_mask, k)
    elif dist_method == "cost-distance":
        cost_graph = create_cost_graph(dem)
        dists, inds = find_nearest_stream_pts_cost_distance(
            dem, stream_mask, cost_graph, k
        )
    else:
        raise ValueError("dist_method must be 'euclidean' or 'cost-distance'")

    stream_rows, stream_cols = np.where(stream_mask.values)
    stream_elevations = dem.values[stream_rows, stream_cols]
    if fit_regression:
        stream_elevations = fit_stream_elevations(stream_elevations)

    stream_surface = idw_stream_surface(stream_elevations, dists, inds, power=1)

    rem = dem - stream_surface
    return rem

subbasin_rem(dem, channel_network, subbasins, k=5, fit_regression=False, dist_method='cost-distance')

Source code in streamkit/terrain/elevation.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def subbasin_rem(
    dem,
    channel_network,
    subbasins,
    k=5,
    fit_regression=False,
    dist_method="cost-distance",
):
    rem = dem.copy(data=np.full(dem.shape, np.nan))
    for value in np.unique(subbasins.values):
        if value == 0 or np.isnan(value):
            continue
        subbasin_mask = subbasins == value
        subbasin_dem = dem.where(subbasin_mask)
        subbasin_streams = channel_network.where(subbasin_mask)
        subbasin_streams = subbasin_streams.fillna(0) > 0
        rem_subbasin = compute_rem(
            subbasin_dem,
            subbasin_streams,
            k=k,
            fit_regression=fit_regression,
            dist_method=dist_method,
        )
        rem = rem.where(~subbasin_mask, rem_subbasin)
    return rem

gaussian_smooth_raster(raster, spatial_radius, sigma)

Apply Gaussian smoothing to a raster while preserving NaN values.

Performs NaN-aware Gaussian filtering that conserves intensity by only redistributing values between non-NaN pixels. NaN pixels remain NaN in the output.

Parameters:
  • raster (DataArray) –

    Input raster to smooth.

  • spatial_radius (float) –

    Radius of the Gaussian kernel in map units (e.g., meters). Converted internally to pixels based on raster resolution.

  • sigma (float) –

    Standard deviation of the Gaussian kernel in pixels. Controls the strength of smoothing.

Returns:
  • DataArray

    Smoothed raster with the same dimensions, coordinates, and NaN pattern as the input.

Source code in streamkit/terrain/smoothing.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def gaussian_smooth_raster(
    raster: xr.DataArray, spatial_radius: float, sigma: float
) -> xr.DataArray:
    """Apply Gaussian smoothing to a raster while preserving NaN values.

    Performs NaN-aware Gaussian filtering that conserves intensity by only
    redistributing values between non-NaN pixels. NaN pixels remain NaN in
    the output.

    Args:
        raster: Input raster to smooth.
        spatial_radius: Radius of the Gaussian kernel in map units (e.g., meters).
            Converted internally to pixels based on raster resolution.
        sigma: Standard deviation of the Gaussian kernel in pixels. Controls
            the strength of smoothing.

    Returns:
        Smoothed raster with the same dimensions, coordinates, and NaN pattern as the input.
    """
    resolution = raster.rio.resolution()[0]
    radius_pixels = int(round(spatial_radius / resolution))

    raster_copy = raster.copy(deep=True)
    raster_copy.data = _filter_nan_gaussian_conserving(
        raster_copy.data, radius_pixels, sigma
    )
    return raster_copy

compute_distance_to_head(streams, flow_direction)

For each cell in the stream raster, compute the maximum upstream length (distance to furthest headwater)

Parameters:
  • streams (DataArray) –

    A binary raster where stream cells are 1 and non-stream cells are 0.

  • flow_direction (DataArray) –

    A raster representing flow direction using ESRI convention.

Returns: A raster where each stream cell contains the maximum upstream length in map units.

Source code in streamkit/terrain/distance_to_head.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def compute_distance_to_head(
    streams: xr.DataArray, flow_direction: xr.DataArray
) -> xr.DataArray:
    """
    For each cell in the stream raster, compute the maximum upstream length
    (distance to furthest headwater)

    Args:
        streams: A binary raster where stream cells are 1 and non-stream cells are 0.
        flow_direction: A raster representing flow direction using ESRI convention.
    Returns:
        A raster where each stream cell contains the maximum upstream length in map units.
    """
    dirmap = _make_numba_esri_dirmap()
    sources, _, _ = find_stream_nodes(streams, flow_direction)
    distance_arr = _distance_from_head(
        streams.data, sources, flow_direction.data, dirmap
    )
    distance_raster = flow_direction.copy(data=distance_arr)
    distance_raster *= np.abs(flow_direction.rio.resolution()[0])
    return distance_raster