Skip to content

API Reference

bin_coverage(coverage, bin_width, length_axis, normalize=False)

Bin coverage by summing over non-overlapping windows.

Parameters:

Name Type Description Default
coverage_array
required
bin_width int

Width of the windows to sum over. Must be an even divisor of the length of the coverage array. If not, raises an error.

required
length_axis int
required
normalize bool

Whether to normalize by the length of the bin.

False

Returns:

Type Description
NDArray[number]

Coverage summed into bins of width bin_width along length_axis.

Source code in python/seqpro/_modifiers.py
def bin_coverage(
    coverage: NDArray[np.number[Any]],
    bin_width: int,
    length_axis: int,
    normalize: bool = False,
) -> NDArray[np.number[Any]]:
    """Bin coverage by summing over non-overlapping windows.

    Parameters
    ----------
    coverage_array
    bin_width
        Width of the windows to sum over. Must be an even divisor of the length
        of the coverage array. If not, raises an error.
    length_axis
    normalize
        Whether to normalize by the length of the bin.

    Returns
    -------
    NDArray[np.number]
        Coverage summed into bins of width `bin_width` along `length_axis`.
    """
    length = coverage.shape[length_axis]
    if length % bin_width != 0:
        raise ValueError("Bin width must evenly divide length.")
    indices = np.arange(0, length, bin_width, dtype=np.intp)
    binned_coverage = np.add.reduceat(coverage, indices, axis=length_axis)
    if normalize:
        binned_coverage /= bin_width
    return binned_coverage

cast_seqs(seqs)

cast_seqs(seqs: NDArray[np.uint8]) -> NDArray[np.uint8]
cast_seqs(seqs: StrSeqType) -> NDArray[np.bytes_]
cast_seqs(seqs: SeqType) -> NDArray[np.bytes_ | np.uint8]

Cast any sequence type to be a NumPy array of ASCII characters (or left alone as 8-bit unsigned integers if the input is OHE).

Parameters:

Name Type Description Default
seqs SeqType
required

Returns:

Type Description
NDArray[bytes_ | uint8]

S1 byte array, or unchanged uint8 array if input is OHE.

Source code in python/seqpro/_utils.py
def cast_seqs(seqs: SeqType) -> NDArray[np.bytes_ | np.uint8]:
    """Cast any sequence type to be a NumPy array of ASCII characters (or left alone as
    8-bit unsigned integers if the input is OHE).

    Parameters
    ----------
    seqs

    Returns
    -------
    NDArray[np.bytes_ | np.uint8]
        S1 byte array, or unchanged uint8 array if input is OHE.
    """
    if isinstance(seqs, str):
        if len(seqs) == 0:
            raise ValueError("Empty string cannot be cast to a sequence array.")
        return np.array([seqs], "S").view("S1")
    elif isinstance(seqs, bytes):
        return np.array([seqs]).view("S1")
    elif isinstance(seqs, Sequence):
        return np.array(seqs, "S")[..., None].view("S1")
    elif seqs.dtype.itemsize > 1:  # dtype == U or bigger than S1
        return seqs.astype("S")[..., None].view("S1")
    else:
        return cast(NDArray[np.bytes_ | np.uint8], seqs)

decode_ohe(seqs, ohe_axis, alphabet, unknown_char='N')

decode_ohe(seqs: NDArray[np.uint8], ohe_axis: int, alphabet: NucleotideAlphabet | AminoAlphabet, unknown_char: str = 'N') -> NDArray[np.bytes_]
decode_ohe(seqs: Ragged[np.uint8], ohe_axis: int, alphabet: NucleotideAlphabet | AminoAlphabet, unknown_char: str = 'N') -> Ragged[np.bytes_]

Convert an OHE array to an S1 byte array.

Parameters:

Name Type Description Default
seqs NDArray[uint8] | Ragged[uint8]

OHE array. Ragged input must have shape (n, ~L, A, ...) as produced by ohe().

required
ohe_axis int

Axis of the one-hot dimension. Ignored for Ragged input (always axis 1 of flat data).

required
alphabet NucleotideAlphabet | AminoAlphabet
required
unknown_char str

Single character to use for unknown values, by default "N"

'N'

Returns:

Type Description
NDArray[bytes_] | Ragged[bytes_]

S1 byte array of decoded characters; ohe_axis is removed from the shape.

Source code in python/seqpro/_encoders.py
def decode_ohe(
    seqs: "NDArray[np.uint8] | Ragged[np.uint8]",
    ohe_axis: int,
    alphabet: "NucleotideAlphabet | AminoAlphabet",
    unknown_char: str = "N",
) -> "NDArray[np.bytes_] | Ragged[np.bytes_]":
    """Convert an OHE array to an S1 byte array.

    Parameters
    ----------
    seqs
        OHE array. Ragged input must have shape (n, ~L, A, ...) as produced by ohe().
    ohe_axis
        Axis of the one-hot dimension. Ignored for Ragged input (always axis 1 of flat data).
    alphabet
    unknown_char
        Single character to use for unknown values, by default "N"

    Returns
    -------
    NDArray[np.bytes_] | Ragged[np.bytes_]
        S1 byte array of decoded characters; ohe_axis is removed from the shape.
    """
    arr = (
        alphabet.array
        if isinstance(alphabet, NucleotideAlphabet)
        else alphabet.aa_array
    )
    _alphabet = np.concatenate([arr, [unknown_char.encode("ascii")]])

    if isinstance(seqs, Ragged):
        seqs = seqs.to_packed()
        n = len(seqs.lengths.ravel())
        # A is always at axis 1 in flat data produced by ohe()
        trailing = seqs.data.shape[2:]
        idx = gufunc_ohe_char_idx(seqs.data, axis=1)  # type: ignore
        flat = _alphabet[idx]
        return Ragged.from_offsets(flat, (n, None, *trailing), seqs.offsets)

    idx = gufunc_ohe_char_idx(seqs, axis=ohe_axis)  # type: ignore
    ohe_axis_idx = seqs.ndim + ohe_axis if ohe_axis < 0 else ohe_axis
    shape = (*seqs.shape[:ohe_axis_idx], *seqs.shape[ohe_axis_idx + 1 :])
    return _alphabet[idx].reshape(shape)

decode_tokens(seqs, token_map, unknown_char='N')

decode_tokens(seqs: NDArray[np.int32], token_map: dict[str, int], unknown_char: str = 'N') -> NDArray[np.bytes_]
decode_tokens(seqs: Ragged[np.int32], token_map: dict[str, int], unknown_char: str = 'N') -> Ragged[np.bytes_]

Untokenize sequences. Maps each integer token back to its character. Tokens absent from token_map are replaced with unknown_char.

Parameters:

Name Type Description Default
seqs NDArray[int32] | Ragged[int32]

Token ID array. Ragged input must have dtype np.int32.

required
token_map dict[str, int]

Mapping of characters to tokens (same map used for tokenization).

required
unknown_char str

Character to replace unknown tokens with, by default 'N'.

'N'

Returns:

Type Description
NDArray[bytes_] | Ragged[bytes_]

S1 byte array with the same shape/layout as the input.

Source code in python/seqpro/_encoders.py
def decode_tokens(
    seqs: NDArray[np.int32] | Ragged[np.int32],
    token_map: dict[str, int],
    unknown_char: str = "N",
) -> NDArray[np.bytes_] | Ragged[np.bytes_]:
    """Untokenize sequences. Maps each integer token back to its character.
    Tokens absent from token_map are replaced with unknown_char.

    Parameters
    ----------
    seqs
        Token ID array. Ragged input must have dtype np.int32.
    token_map
        Mapping of characters to tokens (same map used for tokenization).
    unknown_char
        Character to replace unknown tokens with, by default 'N'.

    Returns
    -------
    NDArray[np.bytes_] | Ragged[np.bytes_]
        S1 byte array with the same shape/layout as the input.
    """
    target = np.array([c.encode("ascii") for c in token_map]).view(np.uint8)
    source = np.array(list(token_map.values()), dtype=np.int32)
    _unk_char = np.uint8(ord(unknown_char))

    if isinstance(seqs, Ragged):
        seqs = seqs.to_packed()
        n = len(seqs.lengths.ravel())
        trailing = seqs.data.shape[1:]
        flat = gufunc_tokenize(seqs.data, source, target, _unk_char).view("S1")
        return Ragged.from_offsets(flat, (n, None, *trailing), seqs.offsets)

    return gufunc_tokenize(seqs, source, target, _unk_char).view("S1")

gc_content(seqs, normalize=True, length_axis=None, alphabet=None, ohe_axis=None)

Compute the number or proportion of G & C nucleotides.

Parameters:

Name Type Description Default
seqs SeqType
required
normalize bool

True => return proportions False => return counts

True
length_axis int | None

Needed if seqs is an array.

None
alphabet NucleotideAlphabet | None

Needed if seqs is OHE.

None
ohe_axis int | None

Needed if seqs is OHE.

None

Returns:

Type Description
NDArray[integer | float64]

Integers if unnormalized, otherwise floats.

Source code in python/seqpro/_analyzers.py
def gc_content(
    seqs: SeqType,
    normalize: bool = True,
    length_axis: int | None = None,
    alphabet: NucleotideAlphabet | None = None,
    ohe_axis: int | None = None,
) -> NDArray[np.integer | np.float64]:
    """Compute the number or proportion of G & C nucleotides.

    Parameters
    ----------
    seqs
    normalize
        True => return proportions
        False => return counts
    length_axis
        Needed if seqs is an array.
    alphabet
        Needed if seqs is OHE.
    ohe_axis
        Needed if seqs is OHE.

    Returns
    -------
    NDArray[np.integer | np.float64]
        Integers if unnormalized, otherwise floats.
    """
    check_axes(seqs, length_axis, ohe_axis)

    arr = cast_seqs(seqs)

    if length_axis is None:  # length axis after casting strings
        length_axis = arr.ndim - 1
    elif length_axis < 0:
        length_axis = arr.ndim + length_axis

    if arr.dtype == np.uint8:  # OHE
        if alphabet is None:
            raise ValueError("Need an alphabet to analyze OHE sequences.")
        assert ohe_axis is not None

        gc_idx = np.flatnonzero(np.isin(alphabet.array, np.array([b"C", b"G"])))
        gc_content = cast(
            NDArray[np.integer],
            np.take(arr, gc_idx, ohe_axis).sum((length_axis, ohe_axis)),
        )
    else:
        gc_content = cast(
            NDArray[np.integer], np.isin(arr, np.array([b"C", b"G"])).sum(length_axis)
        )

    if normalize:
        gc_content = gc_content / arr.shape[length_axis]

    return gc_content

jitter(*arrays, max_jitter, length_axis, jitter_axes, seed=None)

Randomly jitter data from arrays, using the same jitter across arrays.

Parameters:

Name Type Description Default
*arrays NDArray[DTYPE]

Arrays to be jittered. They must have the same sized jitter and length axes.

()
max_jitter int

Maximum jitter amount.

required
length_axis int
required
jitter_axes int | tuple[int, ...]

Each slice along the jitter axes will be randomly jittered independently. Thus, if jitter_axes = 0, then every slice of data along axis 0 would be jittered independently. If jitter_axes = (0, 1), then each slice along axes 0 and 1 would be randomly jittered independently.

required
seed int | Generator | None

Random seed or generator, by default None

None

Returns:

Type Description
tuple[NDArray[DTYPE], ...]

Jittered arrays. Each will have a new length equal to length - 2*max_jitter.

Raises:

Type Description
ValueError

If any arrays have insufficient length to be jittered.

Source code in python/seqpro/_modifiers.py
def jitter(
    *arrays: NDArray[DTYPE],
    max_jitter: int,
    length_axis: int,
    jitter_axes: int | tuple[int, ...],
    seed: int | np.random.Generator | None = None,
) -> tuple[NDArray[DTYPE], ...]:
    """Randomly jitter data from arrays, using the same jitter across arrays.

    Parameters
    ----------
    *arrays
        Arrays to be jittered. They must have the same sized jitter and length
        axes.
    max_jitter
        Maximum jitter amount.
    length_axis
    jitter_axes
        Each slice along the jitter axes will be randomly jittered *independently*.
        Thus, if jitter_axes = 0, then every slice of data along axis 0 would be
        jittered independently. If jitter_axes = (0, 1), then each slice along axes 0
        and 1 would be randomly jittered independently.
    seed
        Random seed or generator, by default None

    Returns
    -------
    tuple[NDArray[DTYPE], ...]
        Jittered arrays. Each will have a new length equal to length - 2*max_jitter.

    Raises
    ------
    ValueError
        If any arrays have insufficient length to be jittered.
    """
    if isinstance(jitter_axes, int):
        jitter_axes = (jitter_axes,)

    # move jitter axes and length axis to back such that shape = (..., jitter, length)
    arrays, destination_axes = _align_axes(*arrays, axes=(*jitter_axes, length_axis))
    short_arrays: list[int] = []
    for i, arr in enumerate(arrays):
        if arr.shape[-1] - 2 * max_jitter <= 0:
            short_arrays.append(i)
    if short_arrays:
        raise ValueError(
            f"Arrays {short_arrays} have insufficient length to be jittered with max_jitter={max_jitter}."
        )

    jitter_axes_shape = arrays[0].shape[-len(jitter_axes) - 1 : -1]
    if seed is None or isinstance(seed, int):
        rng = np.random.default_rng(seed)
    else:
        rng = seed
    starts = rng.integers(0, 2 * max_jitter + 1, jitter_axes_shape)

    sliced_arrs: list[NDArray[Any]] = []
    for arr in arrays:
        jittered_length = arr.shape[-1] - 2 * max_jitter
        sliced = _slice_kmers(arr, starts, jittered_length)
        sliced = np.moveaxis(sliced, destination_axes, [*jitter_axes, length_axis])
        sliced_arrs.append(sliced)

    return tuple(sliced_arrs)

k_shuffle(seqs, k, alphabet, *, length_axis=None, ohe_axis=None, seed=None)

Shuffle sequences while preserving k-let frequencies.

Parameters:

Name Type Description Default
seqs SeqType
required
k int

Size of k-lets to preserve frequencies of.

required
alphabet NucleotideAlphabet

Alphabet, needed for OHE sequence input.

required
length_axis int | None

Needed for array input. Axis that corresponds to the length of sequences.

None
ohe_axes

Needed for OHE input. Axis that corresponds to the one hot encoding, should be the same size as the length of the alphabet.

required
seed int | Generator | None

Seed or generator for shuffling. When given a fixed integer seed, the same (seed, batch_size, k) produces byte-identical output across runs and across thread counts; each row in a batch receives an independent shuffle derived from a parent RNG seeded by this value. Changing batch size changes the per-row seeds.

None

Returns:

Type Description
NDArray[bytes_ | uint8]

Shuffled sequences as bytes (S1) or uint8 for string or OHE input, respectively.

Source code in python/seqpro/_modifiers.py
def k_shuffle(
    seqs: SeqType,
    k: int,
    alphabet: NucleotideAlphabet,
    *,
    length_axis: int | None = None,
    ohe_axis: int | None = None,
    seed: int | np.random.Generator | None = None,
) -> NDArray[np.bytes_ | np.uint8]:
    """Shuffle sequences while preserving k-let frequencies.

    Parameters
    ----------
    seqs
    k
        Size of k-lets to preserve frequencies of.
    alphabet
        Alphabet, needed for OHE sequence input.
    length_axis
        Needed for array input. Axis that corresponds to the length of sequences.
    ohe_axes
        Needed for OHE input. Axis that corresponds to the one hot encoding, should be
        the same size as the length of the alphabet.
    seed
        Seed or generator for shuffling. When given a fixed integer seed, the
        same ``(seed, batch_size, k)`` produces byte-identical output across
        runs and across thread counts; each row in a batch receives an
        independent shuffle derived from a parent RNG seeded by this value.
        Changing batch size changes the per-row seeds.

    Returns
    -------
    NDArray[np.bytes_ | np.uint8]
        Shuffled sequences as bytes (S1) or uint8 for string or OHE input, respectively.
    """

    check_axes(seqs, length_axis, ohe_axis)

    if isinstance(seed, np.random.Generator):
        seed = seed.integers(0, np.iinfo(np.int32).max)  # type: ignore

    arr = cast_seqs(seqs)

    # only get here if seqs was str or list[str]
    if length_axis is None:
        length_axis = arr.ndim - 1

    if arr.dtype == np.uint8:
        assert ohe_axis is not None
        arr_u8 = cast(NDArray[np.uint8], arr)
        ohe = True
        arr = alphabet.decode_ohe(arr_u8, ohe_axis=ohe_axis)
    else:
        ohe = False

    arr = np.moveaxis(arr, length_axis, -1)  # length must be final

    shuffled = _k_shuffle(
        arr.view("u1"), k, len(alphabet), alphabet.array.tobytes(), seed
    ).view("S1")

    shuffled = np.moveaxis(shuffled, -1, length_axis)  # put length back where it was

    if ohe:
        assert ohe_axis is not None
        assert alphabet is not None
        shuffled = cast(NDArray[np.bytes_], shuffled)
        shuffled = alphabet.ohe(shuffled).swapaxes(-1, ohe_axis)

    return shuffled

length(seqs, length_axis=None)

Calculate the length of each sequence.

Parameters:

Name Type Description Default
seqs SeqType

Sequences. For arrays, length_axis selects which axis encodes sequence length; defaults to the last axis.

required
length_axis int | None

Axis to count non-empty characters along. Defaults to the last axis.

None

Returns:

Type Description
NDArray[integer]

Array containing the length of each sequence; length_axis is removed.

Source code in python/seqpro/_analyzers.py
def length(seqs: SeqType, length_axis: int | None = None) -> NDArray[np.integer]:
    """Calculate the length of each sequence.

    Parameters
    ----------
    seqs
        Sequences. For arrays, ``length_axis`` selects which axis encodes sequence
        length; defaults to the last axis.
    length_axis
        Axis to count non-empty characters along. Defaults to the last axis.

    Returns
    -------
    NDArray[np.integer]
        Array containing the length of each sequence; ``length_axis`` is removed.
    """
    _seqs = cast_seqs(seqs)
    if length_axis is None:
        length_axis = -1
    return (_seqs != b"").sum(length_axis)

nucleotide_content(seqs, normalize=True, length_axis=None, alphabet=None)

Compute the number or proportion of each nucleotide.

Parameters:

Name Type Description Default
seqs SeqType
required
normalize bool

True => return proportions False => return counts

True
length_axis int | None

Needed if seqs is an array.

None

Returns:

Type Description
NDArray[integer | floating]

Integers if unnormalized, otherwise floats.

Source code in python/seqpro/_analyzers.py
def nucleotide_content(
    seqs: SeqType,
    normalize: bool = True,
    length_axis: int | None = None,
    alphabet: NucleotideAlphabet | None = None,
) -> NDArray[np.integer | np.floating]:
    """Compute the number or proportion of each nucleotide.

    Parameters
    ----------
    seqs
    normalize
        True => return proportions
        False => return counts
    length_axis
        Needed if seqs is an array.

    Returns
    -------
    NDArray[np.integer | np.floating]
        Integers if unnormalized, otherwise floats.
    """
    check_axes(seqs, length_axis, False)

    arr = cast_seqs(seqs)

    if length_axis is None:
        length_axis = arr.ndim - 1
    elif length_axis < 0:
        length_axis = arr.ndim + length_axis

    if arr.dtype == np.uint8:  # OHE
        nuc_content = cast(NDArray[np.integer], arr.sum(length_axis))
    else:
        if alphabet is None:
            raise ValueError("Need an alphabet to analyze string nucleotide content.")
        nuc_content = np.zeros(
            (*arr.shape[:length_axis], *arr.shape[length_axis + 1 :], len(alphabet)),
            dtype=np.int64,
        )
        for i, nuc in enumerate(alphabet.array):
            nuc_content[..., i] = (arr == nuc).sum(length_axis)

    if normalize:
        nuc_content = nuc_content / arr.shape[length_axis]

    return nuc_content

ohe(seqs, alphabet)

ohe(seqs: StrSeqType, alphabet: NucleotideAlphabet | AminoAlphabet) -> NDArray[np.uint8]
ohe(seqs: Ragged[np.bytes_], alphabet: NucleotideAlphabet | AminoAlphabet) -> Ragged[np.uint8]

One hot encode sequences against an alphabet.

Parameters:

Name Type Description Default
seqs StrSeqType | Ragged[bytes_]

Sequences to encode. Ragged input must have dtype np.bytes_ (S1).

required
alphabet NucleotideAlphabet | AminoAlphabet
required

Returns:

Type Description
NDArray[uint8] | Ragged[uint8]

One-hot encoded sequences. Dense output has shape (..., length, alphabet_size). Ragged output has shape (n, ~L, A).

Source code in python/seqpro/_encoders.py
def ohe(
    seqs: "StrSeqType | Ragged[np.bytes_]",
    alphabet: "NucleotideAlphabet | AminoAlphabet",
) -> "NDArray[np.uint8] | Ragged[np.uint8]":
    """One hot encode sequences against an alphabet.

    Parameters
    ----------
    seqs
        Sequences to encode. Ragged input must have dtype np.bytes_ (S1).
    alphabet

    Returns
    -------
    NDArray[np.uint8] | Ragged[np.uint8]
        One-hot encoded sequences. Dense output has shape (..., length, alphabet_size).
        Ragged output has shape (n, ~L, A).
    """
    arr = (
        alphabet.array
        if isinstance(alphabet, NucleotideAlphabet)
        else alphabet.aa_array
    )

    if isinstance(seqs, Ragged):
        seqs = seqs.to_packed()
        n = len(seqs.lengths.ravel())
        A = arr.shape[0]
        trailing = seqs.data.shape[1:]
        flat = gufunc_ohe(seqs.data.view(np.uint8), arr.view(np.uint8))
        # gufunc appends A last: (..., *trailing, A) → move A to axis 1
        if trailing:
            flat = np.moveaxis(flat, -1, 1)
        return Ragged.from_offsets(flat, (n, None, A, *trailing), seqs.offsets)

    _seqs = cast_seqs(seqs)
    return gufunc_ohe(_seqs.view(np.uint8), arr.view(np.uint8))

pad_seqs(seqs, pad, pad_value=None, length=None, length_axis=None)

Pad (or truncate) sequences on either the left, right, or both sides.

Parameters:

Name Type Description Default
seqs SeqType
required
pad Literal['left', 'both', 'right']

How to pad. If padding on both sides and an odd amount of padding is needed, 1 more pad value will be on the right side. Similarly for truncating, if an odd amount length needs to be truncated, 1 more character will be truncated from the right side.

required
pad_val

Single character to pad sequences with. Needed for string input. Ignored for OHE sequences.

required
length int | None

Needed for character or OHE array input. Length to pad or truncate sequences to. If not given, uses the length of longest sequence.

None
length_axis int | None

Needed for array input.

None

Returns:

Type Description
NDArray[bytes_ | uint8]

Padded (or truncated) sequences as S1 bytes or uint8 for OHE input.

Source code in python/seqpro/_encoders.py
def pad_seqs(
    seqs: SeqType,
    pad: Literal["left", "both", "right"],
    pad_value: str | None = None,
    length: int | None = None,
    length_axis: int | None = None,
) -> NDArray[np.bytes_ | np.uint8]:
    """Pad (or truncate) sequences on either the left, right, or both sides.

    Parameters
    ----------
    seqs
    pad
        How to pad. If padding on both sides and an odd amount of padding is needed, 1
        more pad value will be on the right side. Similarly for truncating, if an odd
        amount length needs to be truncated, 1 more character will be truncated from the
        right side.
    pad_val
        Single character to pad sequences with. Needed for string input. Ignored for OHE
        sequences.
    length
        Needed for character or OHE array input. Length to pad or truncate sequences to.
        If not given, uses the length of longest sequence.
    length_axis
        Needed for array input.

    Returns
    -------
    NDArray[np.bytes_ | np.uint8]
        Padded (or truncated) sequences as S1 bytes or uint8 for OHE input.
    """
    check_axes(seqs, length_axis, False)

    string_input = (
        isinstance(seqs, (str, list))
        or (isinstance(seqs, np.ndarray) and seqs.dtype.kind == "U")
        or (isinstance(seqs, np.ndarray) and seqs.dtype.type == np.object_)
    )

    arr = cast_seqs(seqs)

    if length_axis is None:
        length_axis = arr.ndim - 1
    elif length_axis < 0:
        length_axis = arr.ndim + length_axis

    if string_input:
        if pad_value is None:
            raise ValueError("Need a pad value for plain string input.")

        if length is not None:
            arr = arr[..., :length]

        arr_u8 = arr.view(np.uint8)

        if pad == "left":
            arr_u8 = gufunc_pad_left(arr_u8)
        elif pad == "both":
            arr_u8 = gufunc_pad_both(arr_u8)

        # convert empty character '' to pad_val
        arr_u8[arr_u8 == 0] = ord(pad_value)

        result: NDArray[np.bytes_ | np.uint8] = cast(
            NDArray[np.bytes_], arr_u8.view("S1")
        )
    else:
        if length is None:
            raise ValueError("Need a length for array input.")

        length_diff = arr.shape[length_axis] - length

        if length_diff == 0:
            return arr
        elif length_diff > 0:  # longer than needed, truncate
            if pad == "left":
                arr = array_slice(arr, length_axis, slice(-length))
            elif pad == "both":
                arr = array_slice(
                    arr, length_axis, slice(length_diff // 2, -length_diff // 2)
                )
            else:
                arr = array_slice(arr, length_axis, slice(None, length))
        else:  # shorter than needed, pad
            pad_arr_shape = (
                *arr.shape[:length_axis],
                -length_diff,
                *arr.shape[length_axis + 1 :],
            )
            if arr.dtype == np.uint8:
                pad_arr = np.zeros(pad_arr_shape, np.uint8)
            else:
                if pad_value is None:
                    raise ValueError("Need a pad value for byte array input.")
                pad_arr = np.full(pad_arr_shape, pad_value.encode("ascii"), dtype="S1")
            arr = np.concatenate([arr, pad_arr], axis=length_axis)
        result = arr

    return result

random_seqs(shape, alphabet, seed=None)

Generate random nucleotide sequences.

Parameters:

Name Type Description Default
shape int | tuple[int, ...]

Shape of sequences to generate

required
alphabet NucleotideAlphabet

Alphabet to sample nucleotides from.

required
seed int | Generator | None

Random seed or generator.

None

Returns:

Type Description
NDArray[bytes_]

Randomly generated sequences of shape shape with S1 dtype.

Source code in python/seqpro/_modifiers.py
def random_seqs(
    shape: int | tuple[int, ...],
    alphabet: NucleotideAlphabet,
    seed: int | np.random.Generator | None = None,
) -> NDArray[np.bytes_]:
    """Generate random nucleotide sequences.

    Parameters
    ----------
    shape
        Shape of sequences to generate
    alphabet
        Alphabet to sample nucleotides from.
    seed
        Random seed or generator.

    Returns
    -------
    NDArray[np.bytes_]
        Randomly generated sequences of shape `shape` with S1 dtype.
    """
    if isinstance(seed, int) or seed is None:
        seed = np.random.default_rng(seed)
    return seed.choice(alphabet.array, size=shape)

reverse_complement(seqs, alphabet, length_axis=None, ohe_axis=None)

Reverse complement a sequence.

Parameters:

Name Type Description Default
seqs SeqType
required
alphabet NucleotideAlphabet
required
length_axis int | None

Needed for array input. Length axis, by default None

None
ohe_axis int | None

Needed for OHE input. One hot encoding axis, by default None

None

Returns:

Type Description
NDArray[bytes_ | uint8]

Reverse-complemented sequences as S1 bytes or uint8 for OHE input.

Source code in python/seqpro/_modifiers.py
def reverse_complement(
    seqs: SeqType,
    alphabet: NucleotideAlphabet,
    length_axis: int | None = None,
    ohe_axis: int | None = None,
) -> NDArray[np.bytes_ | np.uint8]:
    """Reverse complement a sequence.

    Parameters
    ----------
    seqs
    alphabet
    length_axis
        Needed for array input. Length axis, by default None
    ohe_axis
        Needed for OHE input. One hot encoding axis, by default None

    Returns
    -------
    NDArray[np.bytes_ | np.uint8]
        Reverse-complemented sequences as S1 bytes or uint8 for OHE input.
    """
    return alphabet.reverse_complement(seqs, length_axis, ohe_axis)

tokenize(seqs, token_map, unknown_token, out=None, *, parallel=None)

tokenize(seqs: StrSeqType, token_map: dict[str, int], unknown_token: int, out: NDArray[np.int32] | None = None, *, parallel: bool | None = None) -> NDArray[np.int32]
tokenize(seqs: Ragged[np.bytes_], token_map: dict[str, int], unknown_token: int, out: None = None, *, parallel: bool | None = None) -> Ragged[np.int32]

Tokenize sequences. Maps each character to its integer token. Characters absent from token_map are replaced with unknown_token.

Parameters:

Name Type Description Default
seqs StrSeqType | Ragged[bytes_]

Sequences to tokenize. Ragged input must have dtype np.bytes_ (S1).

required
token_map dict[str, int]

Mapping of characters to tokens.

required
unknown_token int

Token to use for unknown values.

required
out NDArray[int32] | None

Output array to store the result in. Only valid for non-Ragged input. Must have dtype np.int32; any other dtype raises TypeError.

None
parallel bool | None

Escape hatch overriding the size-based heuristic for choosing between the single-threaded np.take and the parallel Numba LUT gather. None (default) uses the heuristic (parallel past _TOKENIZE_PARALLEL_THRESHOLD elements); True forces the parallel kernel; False forces single-threaded. parallel=True is incompatible with a non-C-contiguous out and raises ValueError.

None

Returns:

Type Description
NDArray[int32] | Ragged[int32]

Integer token IDs with the same shape/layout as the input.

Source code in python/seqpro/_encoders.py
def tokenize(
    seqs: StrSeqType | Ragged[np.bytes_],
    token_map: dict[str, int],
    unknown_token: int,
    out: NDArray[np.int32] | None = None,
    *,
    parallel: bool | None = None,
) -> NDArray[np.int32] | Ragged[np.int32]:
    """Tokenize sequences. Maps each character to its integer token.
    Characters absent from token_map are replaced with unknown_token.

    Parameters
    ----------
    seqs
        Sequences to tokenize. Ragged input must have dtype np.bytes_ (S1).
    token_map
        Mapping of characters to tokens.
    unknown_token
        Token to use for unknown values.
    out
        Output array to store the result in. Only valid for non-Ragged input.
        Must have dtype ``np.int32``; any other dtype raises ``TypeError``.
    parallel
        Escape hatch overriding the size-based heuristic for choosing between the
        single-threaded ``np.take`` and the parallel Numba LUT gather. ``None``
        (default) uses the heuristic (parallel past
        ``_TOKENIZE_PARALLEL_THRESHOLD`` elements); ``True`` forces the parallel
        kernel; ``False`` forces single-threaded. ``parallel=True`` is
        incompatible with a non-C-contiguous ``out`` and raises ``ValueError``.

    Returns
    -------
    NDArray[np.int32] | Ragged[np.int32]
        Integer token IDs with the same shape/layout as the input.
    """
    if out is not None and out.dtype != np.int32:
        raise TypeError(f"out must have dtype int32, got {out.dtype}.")
    # Build a 256-entry lookup table: lut[byte] -> token. Tokenizing is then a
    # gather, lut[seqs]. Small inputs use single-threaded np.take; larger inputs
    # use a parallel Numba gather that overtakes it past _TOKENIZE_PARALLEL_THRESHOLD.
    keys = np.array([c.encode("ascii") for c in token_map]).view(np.uint8)
    vals = np.array(list(token_map.values()), dtype=np.int32)
    lut = np.full(256, np.int32(unknown_token), dtype=np.int32)
    lut[keys] = vals

    if isinstance(seqs, Ragged):
        seqs = seqs.to_packed()
        n = len(seqs.lengths.ravel())
        trailing = seqs.data.shape[1:]
        u8 = seqs.data.view(np.uint8)
        use_parallel = (
            u8.size >= _TOKENIZE_PARALLEL_THRESHOLD if parallel is None else parallel
        )
        if use_parallel:
            flat = np.empty(u8.shape, dtype=np.int32)
            lut_gather(np.ascontiguousarray(u8).reshape(-1), lut, flat.reshape(-1))
        else:
            flat = np.take(lut, u8)
        return Ragged.from_offsets(flat, (n, None, *trailing), seqs.offsets)

    _seqs = cast_seqs(seqs)
    u8 = _seqs.view(np.uint8)
    # A strided out= can't be flattened to a writable view, so the parallel
    # kernel can't write through it; such out= always takes the np.take path
    # (which handles arbitrary strides). parallel=True can't honor it -> error.
    out_blocks_parallel = out is not None and not out.flags.c_contiguous
    if parallel is True and out_blocks_parallel:
        raise ValueError(
            "parallel=True requires a C-contiguous out array, got a "
            "non-contiguous out. Use parallel=None/False or a contiguous out."
        )
    if parallel is None:
        use_parallel = u8.size >= _TOKENIZE_PARALLEL_THRESHOLD
    else:
        use_parallel = parallel
    if not use_parallel or out_blocks_parallel:
        return np.take(lut, u8, out=out)
    result = out if out is not None else np.empty(u8.shape, dtype=np.int32)
    lut_gather(np.ascontiguousarray(u8).reshape(-1), lut, result.reshape(-1))
    return result