API Reference

Watershed Analysis

compute_hand(dem, flow_directions, streams)

Source code in streamkit/watershed.py
74
75
76
77
78
79
80
81
82
83
def compute_hand(dem, flow_directions, streams):
    # note ESRI d8 flow direction encoding
    pysheds_dem, grid = to_pysheds(dem)
    flow_directions, _ = to_pysheds(flow_directions)
    streams, _ = to_pysheds(streams)
    dirmap = (64, 128, 1, 2, 32, 16, 8, 4)
    hand = grid.compute_hand(flow_directions, pysheds_dem, streams > 0, dirmap=dirmap)
    hand = from_pysheds(hand)
    hand = hand.rio.reproject_match(dem)
    return hand

flow_accumulation_workflow(dem)

Given a DEM, compute the conditioned DEM, flow directions, and flow accumulation. Uses d8 flow directions, wraps around whiteboxtools 'fill depression with fix flats' algorithm. Flow direction and accumulation done with pysheds. Uses ESRI flow direction encoding.

Parameters:
  • dem (DataArray) –

    DEM raster

Returns: (conditioned DEM, flow directions, and flow accumulation)

Source code in streamkit/watershed.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def flow_accumulation_workflow(
    dem: xr.DataArray,
) -> tuple[xr.DataArray, xr.DataArray, xr.DataArray]:
    """
    Given a DEM, compute the conditioned DEM, flow directions, and flow
    accumulation. Uses d8 flow directions, wraps around whiteboxtools 'fill
    depression with fix flats' algorithm. Flow direction and accumulation done
    with pysheds. Uses ESRI flow direction encoding.

    Args:
        dem: DEM raster
    Returns:
        (conditioned DEM, flow directions, and flow accumulation)

    """
    # wbt condition
    conditioned_dem = condition_dem(dem)
    pysheds_conditioned_dem, grid = to_pysheds(conditioned_dem)
    flow_directions = grid.flowdir(pysheds_conditioned_dem)
    flow_accumulation = grid.accumulation(flow_directions)

    cdem = from_pysheds(pysheds_conditioned_dem)
    flow_directions = from_pysheds(flow_directions)
    flow_directions = flow_directions.astype(np.int16)
    flow_accumulation = from_pysheds(flow_accumulation)
    return (
        cdem.rio.reproject_match(dem),
        flow_directions.rio.reproject_match(dem),
        flow_accumulation.rio.reproject_match(dem),
    )

delineate_subbasins(stream_raster, flow_directions, flow_accumulation)

Delineate all subbasins given a channel network raster. Detects pour points for each unique stream segment by finding the cell with the highest flow accumulation for that segment (based on ID).

Parameters:
  • stream_raster (DataArray) –

    Raster of channel network with unique IDs for each segment

  • flow_directions (DataArray) –

    Flow directions raster (ESRI d8 encoding)

  • flow_accumulation (DataArray) –

    Flow accumulation raster

Returns: Raster of subbasins with same IDs as stream_raster

Source code in streamkit/watershed.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def delineate_subbasins(
    stream_raster: xr.DataArray,
    flow_directions: xr.DataArray,
    flow_accumulation: xr.DataArray,
) -> xr.DataArray:
    """
    Delineate all subbasins given a channel network raster. Detects pour points
    for each unique stream segment by finding the cell with the highest flow
    accumulation for that segment (based on ID).

    Args:
        stream_raster: Raster of channel network with unique IDs for each
            segment
        flow_directions: Flow directions raster (ESRI d8 encoding)
        flow_accumulation: Flow accumulation raster
    Returns:
        Raster of subbasins with same IDs as stream_raster
    """
    # get pour points from channel network raster
    # these are sorted so that nested basins are handled correctly
    pour_points = _identify_pour_points(stream_raster, flow_accumulation)

    subbasins = stream_raster.copy(
        data=np.zeros_like(stream_raster.data, dtype=np.int32)
    )

    pysheds_fdir, grid = to_pysheds(flow_directions)

    for _, row in pour_points.iterrows():
        pour_row = row["row"]
        pour_col = row["col"]

        catchment = grid.catchment(
            x=pour_col,
            y=pour_row,
            fdir=pysheds_fdir,
            xytype="index",
        )

        subbasins.data[catchment] = row["stream_value"]

    return subbasins

Stream Vectorization and Network Conversion

vectorize_streams(stream_raster, flow_directions, flow_accumulation)

Vectorize streams from a raster to a GeoDataFrame of LineStrings. Args: stream_raster: A raster of stream segments with unique IDs. flow_directions: A raster of flow directions (ESRI D8 encoding). flow_accumulation: A raster of flow accumulation values. Returns: A GeoDataFrame with LineString geometries representing the streams with stream_id column (from the raster values).

Source code in streamkit/vectorize_streams.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def vectorize_streams(
    stream_raster: xr.DataArray,
    flow_directions: xr.DataArray,
    flow_accumulation: xr.DataArray,
) -> gpd.GeoDataFrame:
    """
    Vectorize streams from a raster to a GeoDataFrame of LineStrings.
    Args:
        stream_raster: A raster of stream segments with unique IDs.
        flow_directions: A raster of flow directions (ESRI D8 encoding).
        flow_accumulation: A raster of flow accumulation values.
    Returns:
        A GeoDataFrame with LineString geometries representing the streams with stream_id column (from the raster values).
    """
    flowlines = []
    for stream_id in np.unique(stream_raster.values):
        if stream_id == 0 or np.isnan(stream_id):
            continue

        stream_mask = stream_raster == stream_id
        flow_acc = flow_accumulation * stream_mask.data
        flow_dir = flow_directions * stream_mask.data

        # skip any empty streams or those with one cell
        if np.sum(stream_mask) < 2:
            continue

        line = _vectorize_single_stream(stream_mask, flow_dir, flow_acc)
        flowlines.append({"geometry": line, "stream_id": int(stream_id)})

    gdf = gpd.GeoDataFrame(flowlines, crs=stream_raster.rio.crs)
    return gdf

vector_streams_to_networkx(lines)

Convert a GeoDataFrame of LineString geometries to a NetworkX directed graph.

Creates a directed graph where nodes represent stream endpoints (start and end coordinates) and edges represent stream segments. All attributes from the input GeoDataFrame are preserved as edge attributes in the graph.

Parameters:
  • lines (GeoDataFrame) –

    GeoDataFrame containing LineString geometries representing stream segments, along with any associated attributes.

Returns:
  • DiGraph

    A directed graph where edges contain the original geometry, CRS, and all other attributes from the input GeoDataFrame.

Source code in streamkit/nx_convert.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def vector_streams_to_networkx(lines: gpd.GeoDataFrame) -> nx.DiGraph:
    """Convert a GeoDataFrame of LineString geometries to a NetworkX directed graph.

    Creates a directed graph where nodes represent stream endpoints (start and end
    coordinates) and edges represent stream segments. All attributes from the input
    GeoDataFrame are preserved as edge attributes in the graph.

    Args:
        lines: GeoDataFrame containing LineString geometries representing stream
            segments, along with any associated attributes.

    Returns:
        A directed graph where edges contain the original geometry, CRS, and all other attributes from the input GeoDataFrame.
    """
    G = nx.DiGraph()
    for _, line in lines.iterrows():
        start = line.geometry.coords[0]
        end = line.geometry.coords[-1]
        G.add_edge(
            start,
            end,
            crs=lines.crs,
            geometry=line.geometry,
            **line.drop("geometry").to_dict(),
        )
    return G

networkx_to_gdf(G)

Convert a NetworkX directed graph back to a GeoDataFrame.

Reconstructs vector stream data from a graph representation by converting each edge into a LineString geometry connecting its start and end nodes. All edge attributes are preserved in the output GeoDataFrame.

Parameters:
  • G (DiGraph) –

    A directed graph representing stream networks. Edges must contain 'crs' and 'geometry' attributes, typically created by vector_streams_to_networkx().

Returns:
  • GeoDataFrame

    A GeoDataFrame with LineString geometries representing stream segments and all edge attributes from the graph (excluding the 'crs' attribute which is set as the GeoDataFrame's CRS).

Source code in streamkit/nx_convert.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def networkx_to_gdf(G: nx.DiGraph) -> gpd.GeoDataFrame:
    """Convert a NetworkX directed graph back to a GeoDataFrame.

    Reconstructs vector stream data from a graph representation by converting
    each edge into a LineString geometry connecting its start and end nodes.
    All edge attributes are preserved in the output GeoDataFrame.

    Args:
        G: A directed graph representing stream networks. Edges must contain
            'crs' and 'geometry' attributes, typically created by
            vector_streams_to_networkx().

    Returns:
        A GeoDataFrame with LineString geometries representing stream segments and all edge attributes from the graph (excluding the 'crs' attribute which is set as the GeoDataFrame's CRS).
    """
    edges = []
    for u, v, data in G.edges(data=True):
        edges.append(
            {
                "geometry": gpd.points_from_xy([u[0], v[0]], [u[1], v[1]]).unary_union,
                **data,
            }
        )
    gdf = gpd.GeoDataFrame(edges, geometry="geometry")
    gdf = gdf.set_crs(gdf["crs"].iloc[0])
    gdf = gdf.drop(columns=["crs"])
    return gdf

Network Analysis

strahler_order(G)

Calculate the Strahler order for each edge in a directed graph.

Parameters:
  • G (DiGraph) –

    A directed graph representing a river network. Use 'vector_streams_to_networkx()' to create this from vector data.

Returns: The same directed graph with an additional 'strahler' attribute on each edge indicating its Strahler order.

Source code in streamkit/strahler.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def strahler_order(G: nx.DiGraph) -> nx.DiGraph:
    """Calculate the Strahler order for each edge in a directed graph.

    Args:
        G: A directed graph representing a river network. Use 'vector_streams_to_networkx()' to create this from vector data.
    Returns:
        The same directed graph with an additional 'strahler' attribute on each edge indicating its Strahler order.
    """
    # first, find all root nodes
    # then, for each subgraph, find the strahler order of each edge
    G = G.copy()
    roots = [n for n in G.nodes if G.out_degree(n) == 0]
    for root in roots:
        _calculate_strahler(G, root)
    return G

upstream_length(G)

Compute the maximum upstream length for each edge in a directed graph G. Uses the length attribute of the edge geometry.

Parameters:
  • G (DiGraph) –

    A directed graph where edges have a 'geometry' attribute (shapely LineString).

Returns: A copy of the graph with an additional attribute 'max_upstream_length' for each edge.

Source code in streamkit/upstream_length.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def upstream_length(G: nx.DiGraph) -> nx.DiGraph:
    """
    Compute the maximum upstream length for each edge in a directed graph G. Uses the length attribute of the edge geometry.

    Args:
        G: A directed graph where edges have a 'geometry' attribute (shapely LineString).
    Returns:
        A copy of the graph with an additional attribute 'max_upstream_length' for each edge.
    """
    # Compute the maximum upstream length for each edge in a directed graph G.
    # confirm that all edges have a 'geometry' attribute
    G = G.copy()
    if not all("geometry" in G.edges[e] for e in G.edges):
        raise ValueError(
            "All edges must have a 'geometry' attribute. Cannot compute upstream length."
        )

    for u, v, d in G.edges(data=True):
        d["max_upstream_length"] = 0.0

    for node in nx.topological_sort(G):
        upstream = list(G.in_edges(node, data=True))

        if len(upstream) == 0:
            # this is headwater, set length to the length of the edge.geometry
            out_edges = list(G.out_edges(node, data=True))
            if len(out_edges) == 1:
                _, _, out_data = out_edges[0]
                length = out_data.get("geometry").length
        elif len(upstream) == 1:
            u, v, data = upstream[0]
            length = data.get("max_upstream_length") + data.get("geometry").length
        else:
            max_upstream_length = 0.0
            for u, v, data in upstream:
                upstream_length = (
                    data.get("max_upstream_length") + data.get("geometry").length
                )
                if upstream_length > max_upstream_length:
                    max_upstream_length = upstream_length
            length = max_upstream_length

        for _, v, out_data in G.out_edges(node, data=True):
            out_data["max_upstream_length"] = length
    return G

label_mainstem(G)

Label mainstem edges in a stream network graph.

Identifies and labels the main channel (mainstem) of each drainage network by traversing from outlet nodes upstream. Edges are labeled with a 'mainstem' attribute set to True for mainstem edges and False for tributaries.

Parameters:
  • G (DiGraph) –

    A directed graph representing a stream network. Each edge must have 'strahler' and 'max_upstream_length' attributes. If these are missing, run strahler_order() and upstream_length() first.

Returns:
  • DiGraph

    A copy of the input graph with edges labeled with a 'mainstem' boolean attribute indicating whether each edge is part of the main channel.

Raises:
  • ValueError

    If any edge is missing the required 'strahler' or 'max_upstream_length' attributes.

Source code in streamkit/mainstem.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def label_mainstem(G: nx.DiGraph) -> nx.DiGraph:
    """Label mainstem edges in a stream network graph.

    Identifies and labels the main channel (mainstem) of each drainage network
    by traversing from outlet nodes upstream. Edges are labeled with a 'mainstem'
    attribute set to True for mainstem edges and False for tributaries.

    Args:
        G: A directed graph representing a stream network. Each edge must have
            'strahler' and 'max_upstream_length' attributes. If these are missing,
            run strahler_order() and upstream_length() first.

    Returns:
        A copy of the input graph with edges labeled with a 'mainstem' boolean attribute indicating whether each edge is part of the main channel.

    Raises:
        ValueError: If any edge is missing the required 'strahler' or
            'max_upstream_length' attributes.
    """
    G = G.copy()

    # make sure that edges have a 'strahler' attribute
    if not all("strahler" in G.edges[e] for e in G.edges):
        raise ValueError(
            "All edges must have a 'strahler' attribute. Run strahler_order() first."
        )
    if not all("max_upstream_length" in G.edges[e] for e in G.edges):
        raise ValueError(
            "All edges must have a 'max_upstream_length' attribute. Run upstream_length() first."
        )

    # initialize all edges as not mainstem_edge
    for e in G.edges:
        G.edges[e]["mainstem"] = False

    roots = [n for n in G.nodes if G.out_degree(n) == 0]
    for root in roots:
        _label_mainstem(G, root)
    return G

network_cross_sections(linestrings, interval_distance, width, linestring_ids=None, smoothed=False)

Create cross-sections at regular intervals along linestrings.

Parameters:
  • linestrings (GeoSeries) –

    Linestring geometries.

  • interval_distance (float) –

    Distance between cross-sections along the linestrings.

  • width (float) –

    Width of each cross-section.

  • linestring_ids (Optional[Sequence], default: None ) –

    Optional identifiers for each linestring. If None, the index of linestrings is used.

  • smoothed (bool, default: False ) –

    Whether to use smoothed angles for cross-sections.

Returns: cross section linestrings

Source code in streamkit/xs.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def network_cross_sections(
    linestrings: gpd.GeoSeries,
    interval_distance: float,
    width: float,
    linestring_ids: Optional[Sequence] = None,
    smoothed: bool = False,
):
    """
    Create cross-sections at regular intervals along linestrings.

    Args:
        linestrings: Linestring geometries.
        interval_distance: Distance between cross-sections along the linestrings.
        width: Width of each cross-section.
        linestring_ids: Optional identifiers for each linestring. If None, the index of linestrings is used.
        smoothed: Whether to use smoothed angles for cross-sections.
    Returns:
        cross section linestrings
    """
    if linestring_ids is None:
        linestring_ids = linestrings.index
    else:
        if len(linestring_ids) != len(linestrings):
            raise ValueError("provided ids must match the length of linestrings")

    xsections = []
    for cid, linestring in zip(linestring_ids, linestrings):
        channel_xsections = _create_cross_sections(
            linestring, interval_distance, width, crs=linestrings.crs, smoothed=smoothed
        )
        # convert to DataFrame and add segment_id
        channel_xsections = gpd.GeoDataFrame(
            geometry=channel_xsections, crs=linestrings.crs
        )
        channel_xsections["linestring_id"] = cid
        xsections.append(channel_xsections)
    xsections = pd.concat(xsections)
    xsections["xs_id"] = np.arange(1, len(xsections) + 1)
    return xsections

sample_cross_sections(xs_linestrings, point_interval)

Generate profile points along cross-section linestrings at regular intervals.

Creates evenly-spaced points along each cross-section line, measuring distances from the center point. Points are labeled as 'center', 'positive' (downstream of center), or 'negative' (upstream of center).

Parameters:
  • xs_linestrings (GeoDataFrame) –

    GeoDataFrame containing LineString geometries representing cross-sections. If 'xs_id' column is not present, sequential IDs will be automatically assigned.

  • point_interval (float) –

    Spacing between points along each cross-section, in the units of the GeoDataFrame's CRS.

Returns:
  • GeoDataFrame

    A GeoDataFrame with Point geometries containing columns: - xs_id: Cross-section identifier - side: Position relative to center ('center', 'positive', 'negative') - distance: Distance from center point (negative for upstream, positive for downstream) - geometry: Point geometry - Additional columns from input xs_linestrings are preserved

Source code in streamkit/profile.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def sample_cross_sections(
    xs_linestrings: gpd.GeoDataFrame, point_interval: float
) -> gpd.GeoDataFrame:
    """Generate profile points along cross-section linestrings at regular intervals.

    Creates evenly-spaced points along each cross-section line, measuring distances
    from the center point. Points are labeled as 'center', 'positive' (downstream
    of center), or 'negative' (upstream of center).

    Args:
        xs_linestrings: GeoDataFrame containing LineString geometries representing
            cross-sections. If 'xs_id' column is not present, sequential IDs will
            be automatically assigned.
        point_interval: Spacing between points along each cross-section, in the
            units of the GeoDataFrame's CRS.

    Returns:
        A GeoDataFrame with Point geometries containing columns:
            - xs_id: Cross-section identifier
            - side: Position relative to center ('center', 'positive', 'negative')
            - distance: Distance from center point (negative for upstream, positive
              for downstream)
            - geometry: Point geometry
            - Additional columns from input xs_linestrings are preserved
    """

    if "xs_id" not in xs_linestrings.columns:
        xs_linestrings["xs_id"] = np.arange(1, len(xs_linestrings) + 1)

    xs_points = []
    for xs_id, xs_linestring in xs_linestrings.groupby("xs_id"):
        for _, linestring in xs_linestring.iterrows():
            points = _points_along_linestring(
                linestring.geometry, point_interval, crs=xs_linestrings.crs
            )
            points["xs_id"] = xs_id
            # add other columns from xs_linestrings
            for col in xs_linestring.columns:
                if col != "geometry":
                    points[col] = linestring[col]
            xs_points.append(points)

    return gpd.GeoDataFrame(
        pd.concat(xs_points), crs=xs_linestrings.crs, geometry="geometry"
    ).reset_index(drop=True)

Terrain Analysis

gaussian_smooth_raster(raster, spatial_radius, sigma)

Apply Gaussian smoothing to a raster while preserving NaN values.

Performs NaN-aware Gaussian filtering that conserves intensity by only redistributing values between non-NaN pixels. NaN pixels remain NaN in the output.

Parameters:
  • raster (DataArray) –

    Input raster to smooth.

  • spatial_radius (float) –

    Radius of the Gaussian kernel in map units (e.g., meters). Converted internally to pixels based on raster resolution.

  • sigma (float) –

    Standard deviation of the Gaussian kernel in pixels. Controls the strength of smoothing.

Returns:
  • DataArray

    Smoothed raster with the same dimensions, coordinates, and NaN pattern as the input.

Source code in streamkit/smooth.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def gaussian_smooth_raster(
    raster: xr.DataArray, spatial_radius: float, sigma: float
) -> xr.DataArray:
    """Apply Gaussian smoothing to a raster while preserving NaN values.

    Performs NaN-aware Gaussian filtering that conserves intensity by only
    redistributing values between non-NaN pixels. NaN pixels remain NaN in
    the output.

    Args:
        raster: Input raster to smooth.
        spatial_radius: Radius of the Gaussian kernel in map units (e.g., meters).
            Converted internally to pixels based on raster resolution.
        sigma: Standard deviation of the Gaussian kernel in pixels. Controls
            the strength of smoothing.

    Returns:
        Smoothed raster with the same dimensions, coordinates, and NaN pattern as the input.
    """
    resolution = raster.rio.resolution()[0]
    radius_pixels = int(round(spatial_radius / resolution))

    raster_copy = raster.copy(deep=True)
    raster_copy.data = _filter_nan_gaussian_conserving(
        raster_copy.data, radius_pixels, sigma
    )
    return raster_copy

upstream_length_raster(streams, flow_direction)

For each cell in the stream raster, compute the maximum upstream length

Parameters:
  • streams (DataArray) –

    A binary raster where stream cells are 1 and non-stream cells are 0.

  • flow_direction (DataArray) –

    A raster representing flow direction using ESRI convention.

Returns: A raster where each stream cell contains the maximum upstream length in map units.

Source code in streamkit/upstream_length.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def upstream_length_raster(
    streams: xr.DataArray, flow_direction: xr.DataArray
) -> xr.DataArray:
    """
    For each cell in the stream raster, compute the maximum upstream length

    Args:
        streams: A binary raster where stream cells are 1 and non-stream cells are 0.
        flow_direction: A raster representing flow direction using ESRI convention.
    Returns:
        A raster where each stream cell contains the maximum upstream length in map units.
    """
    dirmap = _make_numba_esri_dirmap()
    sources, _, _ = find_stream_nodes(streams, flow_direction)
    distance_arr = _distance_from_head(
        streams.data, sources, flow_direction.data, dirmap
    )
    distance_raster = flow_direction.copy(data=distance_arr)
    distance_raster *= np.abs(flow_direction.rio.resolution()[0])
    return distance_raster

Reach Delineation

delineate_reaches(stream_raster, dem, flow_dir, flow_acc, penalty=None, min_length=500, smooth_window=None, threshold_degrees=1.0)

Segment stream networks into reaches based on slope change points.

Uses the PELT (Pruned Exact Linear Time) changepoint detection algorithm to identify distinct reaches along each stream based on slope variations. Adjacent reaches with similar slopes can be merged using the threshold parameter.

Parameters:
  • stream_raster (DataArray) –

    Labeled stream network where each unique value represents a stream segment (0 for non-stream pixels).

  • dem (DataArray) –

    Digital elevation model for slope calculations.

  • flow_dir (DataArray) –

    Flow direction raster corresponding to the DEM.

  • flow_acc (DataArray) –

    Flow accumulation raster corresponding to the DEM.

  • penalty (float | None, default: None ) –

    PELT algorithm penalty parameter. If None, automatically calculated as log(n) * variance of slope signal.

  • min_length (float, default: 500 ) –

    Minimum reach length in meters.

  • smooth_window (int | None, default: None ) –

    Window size for smoothing slope values before segmentation. If None, no smoothing is applied.

  • threshold_degrees (float, default: 1.0 ) –

    Merge adjacent reaches if slope difference is below this threshold in degrees.

Returns:
  • DataArray

    A raster where each pixel value represents a unique reach ID (0 for non-stream pixels). Reach IDs are computed as reach_number + (stream_id * 1000).

Source code in streamkit/reach.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def delineate_reaches(
    stream_raster: xr.DataArray,
    dem: xr.DataArray,
    flow_dir: xr.DataArray,
    flow_acc: xr.DataArray,
    penalty: float | None = None,
    min_length: float = 500,
    smooth_window: int | None = None,
    threshold_degrees: float = 1.0,
) -> xr.DataArray:
    """Segment stream networks into reaches based on slope change points.

    Uses the PELT (Pruned Exact Linear Time) changepoint detection algorithm
    to identify distinct reaches along each stream based on slope variations.
    Adjacent reaches with similar slopes can be merged using the threshold parameter.

    Args:
        stream_raster: Labeled stream network where each unique value represents
            a stream segment (0 for non-stream pixels).
        dem: Digital elevation model for slope calculations.
        flow_dir: Flow direction raster corresponding to the DEM.
        flow_acc: Flow accumulation raster corresponding to the DEM.
        penalty: PELT algorithm penalty parameter. If None, automatically calculated
            as log(n) * variance of slope signal.
        min_length: Minimum reach length in meters.
        smooth_window: Window size for smoothing slope values before segmentation.
            If None, no smoothing is applied.
        threshold_degrees: Merge adjacent reaches if slope difference is below this
            threshold in degrees.

    Returns:
        A raster where each pixel value represents a unique reach ID (0 for non-stream pixels). Reach IDs are computed as reach_number + (stream_id * 1000).
    """
    reaches = stream_raster.copy(data=np.zeros_like(stream_raster, dtype=np.uint32))
    for stream_val in np.unique(stream_raster):
        if stream_val == 0 or np.isnan(stream_val):
            continue
        else:
            stream_df = _create_stream_points(
                stream_raster == stream_val, flow_dir, flow_acc, dem
            )
            # roughly convert min_length in meters to number of points
            min_size = int(min_length / flow_dir.rio.resolution()[0])
            stream_df = _pelt_reaches(
                stream_df,
                penalty=penalty,
                min_size=min_size,
                smooth_window=smooth_window,
            )
            stream_df = _merge_reaches_by_threshold(
                stream_df, threshold_degrees=threshold_degrees
            )
            stream_df["reach_val"] = stream_df["reach_id"] + (
                np.uint32(stream_val) * 1000
            )
            rows, cols = stream_df["row"].values, stream_df["col"].values
            reaches.data[rows, cols] = stream_df["reach_val"].values
    return reaches

Data Download Utilities

get_huc_data(hucid, nhd_layer='medium', crs='EPSG:4326', dem_resolution=10)

Download hydrological and topographic data for a given HUC ID.

Retrieves NHD flowlines, and digital elevation model (DEM) data for the specified Hydrologic Unit Code (HUC) area.

Parameters:
  • hucid (str) –

    The Hydrologic Unit Code identifier for the watershed area.

  • nhd_layer (str, default: 'medium' ) –

    Options: "medium" or "high". Defaults to "medium"

  • crs (str, default: 'EPSG:4326' ) –

    The coordinate reference system for the output data as an EPSG code or other CRS string. Defaults to "EPSG:4326" (WGS84).

  • dem_resolution (int, default: 10 ) –

    The spatial resolution of the DEM in meters. Defaults to 10.

Returns:
  • Tuple[GeoDataFrame, DataArray]

    (dem raster, flowlines GeoDataFrame)

Source code in streamkit/data.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def get_huc_data(
    hucid: str,
    nhd_layer: str = "medium",
    crs: str = "EPSG:4326",
    dem_resolution: int = 10,
) -> Tuple[gpd.GeoDataFrame, xr.DataArray]:
    """Download hydrological and topographic data for a given HUC ID.

    Retrieves NHD flowlines, and digital elevation model (DEM) data for the
    specified Hydrologic Unit Code (HUC) area.

    Args:
        hucid: The Hydrologic Unit Code identifier for the watershed area.
        nhd_layer: Options: "medium" or "high". Defaults to "medium"
        crs: The coordinate reference system for the output data as an EPSG code
            or other CRS string. Defaults to "EPSG:4326" (WGS84).
        dem_resolution: The spatial resolution of the DEM in meters. Defaults to 10.

    Returns:
        (dem raster, flowlines GeoDataFrame)

    """

    huc_bounds = download_huc_bounds(hucid)
    flowlines = download_flowlines(
        huc_bounds, layer=nhd_layer, linestring_only=True, crs=crs
    )
    dem = download_dem(huc_bounds, dem_resolution, crs)

    return (dem, flowlines)

download_huc_bounds(huc, wbd=None)

Source code in streamkit/data.py
50
51
52
53
54
55
56
57
58
def download_huc_bounds(huc, wbd=None):
    huc = str(huc)
    level = f"huc{str(len(huc))}"

    if wbd is None:
        wbd = WBD(level)

    huc_wbd = wbd.byids(level, huc)
    return huc_wbd

download_flowlines(gdf, layer='medium', linestring_only=True, crs='EPSG:4326')

Source code in streamkit/data.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def download_flowlines(gdf, layer="medium", linestring_only=True, crs="EPSG:4326"):
    if layer == "medium":
        nhd = NHD("flowline_mr")
        flowlines = _download_flowlines(gdf, nhd, linestring_only, crs)
    elif layer == "high":
        nhd_hr = NHDPlusHR("flowline")
        flowlines = _download_flowlines(gdf, nhd_hr, linestring_only, crs)

        # if non network flowlines exist, download and append them
        # need to use try except because some areas may not have non network flowlines
        try:
            nhd_hr_non_network = NHDPlusHR("non_network_flowline")
            non_network_flowlines = _download_flowlines(
                gdf, nhd_hr_non_network, linestring_only, crs
            )
            flowlines = pd.concat([flowlines, non_network_flowlines], ignore_index=True)
        except:
            pass
    else:
        raise ValueError("layer must be either 'medium' or 'high'")
    return flowlines

download_dem(gdf, resolution, crs='EPSG:4326')

Source code in streamkit/data.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def download_dem(gdf, resolution, crs="EPSG:4326"):
    boundary = gdf.union_all()

    # if boundary is MultiPolygon
    if isinstance(boundary, MultiPolygon):
        boundary = boundary.bounds

    try:
        dem = py3dep.static_3dep_dem(boundary, resolution=resolution, crs=gdf.crs)
    except:
        dem = retry_on_smaller(boundary)

    if isinstance(boundary, MultiPolygon):
        dem = dem.rio.clip(gdf.geometry, gdf.crs, drop=True, all_touched=True)

    if dem.rio.crs != crs:
        dem = dem.rio.reproject(crs, resampling=rasterio.enums.Resampling.bilinear)
    return dem

NHD Utilities

rasterize_nhd(nhd_flowlines, flow_dir)

Create a raster representation of NHD flowlines

Converts vector NHD flowlines to a raster stream network by identifying channel heads, tracing streams downslope along the flow direction raster, and linking stream segments. Small streams (< 2 pixels) are removed and the remaining streams are relabeled with consecutive integers.

Parameters:
  • nhd_flowlines (GeoDataFrame) –

    Vector flowline data from the National Hydrography Dataset containing stream geometries.

  • flow_dir (DataArray) –

    Raster of flow directions used to trace streams downslope.

Returns:
  • DataArray

    A raster DataArray where each pixel value represents a unique stream ID (0 for non-stream pixels, consecutive positive integers for stream segments).

Source code in streamkit/nhd.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def rasterize_nhd(
    nhd_flowlines: gpd.GeoDataFrame, flow_dir: xr.DataArray
) -> xr.DataArray:
    """Create a raster representation of NHD flowlines

    Converts vector NHD flowlines to a raster stream network by identifying
    channel heads, tracing streams downslope along the flow direction raster,
    and linking stream segments. Small streams (< 2 pixels) are removed and
    the remaining streams are relabeled with consecutive integers.

    Args:
        nhd_flowlines: Vector flowline data from the National Hydrography Dataset
            containing stream geometries.
        flow_dir: Raster of flow directions used to trace streams downslope.

    Returns:
        A raster DataArray where each pixel value represents a unique stream ID (0 for non-stream pixels, consecutive positive integers for stream segments).
    """
    channel_heads = _nhd_channel_heads(nhd_flowlines)

    xs, ys = zip(*[(pt.x, pt.y) for pt in channel_heads])
    inverse = ~flow_dir.rio.transform()
    indices = [inverse * (x, y) for x, y in zip(xs, ys)]
    points = [
        (int(row), int(col)) for col, row in indices
    ]  # note the order of col, row...

    stream_raster = trace_streams(points, flow_dir)
    stream_raster = link_streams(stream_raster, flow_dir)

    # drop any small streams (< 2 pixels)
    # find all unique stream IDs where the count is < 2
    unique, counts = np.unique(stream_raster.data, return_counts=True)
    small_streams = unique[counts < 2]
    for stream_id in small_streams:
        stream_raster.data[stream_raster.data == stream_id] = 0

    # re-label streams to be consecutive integers
    unique, counts = np.unique(stream_raster.data, return_counts=True)
    new_id = 1
    for stream_id in unique:
        if stream_id == 0:
            continue
        stream_raster.data[stream_raster.data == stream_id] = new_id
        new_id += 1

    return stream_raster