Skip to content

API documentation

bam

Yields records from a BAM file using pysam

DatabaseHandler

Database handler for multiprocessing.

Source code in src/fast5_rekindler/bam.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class DatabaseHandler:
    """Database handler for multiprocessing."""

    def __init__(self, output_dir: str, num_processes: int):
        """Initializes the database handler."""
        self.output_dir = output_dir
        self.num_processes = num_processes

    def init_func(self, worker_id: int, worker_state: Dict[str, Any]) -> None:
        """Initializes the database for each worker."""
        unique_db_path = os.path.join(self.output_dir, f"tmp_worker_{worker_id}.db")
        worker_state["db_connection"] = sqlite3.connect(unique_db_path)
        worker_state["db_cursor"] = worker_state["db_connection"].cursor()
        worker_state["db_cursor"].execute(TABLE_INIT_QUERY)
        worker_state["db_connection"].commit()

    def exit_func(self, worker_id: int, worker_state: Dict[str, Any]) -> None:
        """Closes the database connection for each worker."""
        conn = worker_state["db_connection"]
        cursor = worker_state["db_cursor"]
        conn.commit()
        cursor.close()
        conn.close()

    def merge_databases(self) -> None:
        """Merges the databases from each worker into a single database."""
        db_path = os.path.join(self.output_dir, "bam_db.db")
        main_conn = sqlite3.connect(db_path)
        main_cursor = main_conn.cursor()
        logger.info("Merging databases from each worker into a single database...")
        main_cursor.execute(TABLE_INIT_QUERY)

        try:
            for i in range(self.num_processes):
                worker_db_path = os.path.join(self.output_dir, f"tmp_worker_{i}.db")

                try:
                    main_cursor.execute(
                        f"ATTACH DATABASE '{worker_db_path}' AS worker_db"
                    )
                    main_cursor.execute("BEGIN")

                    main_cursor.execute(
                        """
                        INSERT OR IGNORE INTO bam_db
                        SELECT * FROM worker_db.bam_db
                        """
                    )

                    main_cursor.execute("COMMIT")

                except Exception:
                    logger.warning(
                        "May be an empty database # {}. Nothing to worry about.",
                        i,
                    )

                finally:
                    main_cursor.execute("DETACH DATABASE worker_db")
                    os.remove(worker_db_path)

        finally:
            main_cursor.close()
            main_conn.close()
            logger.info("Merging databases complete.")

__init__(output_dir: str, num_processes: int)

Initializes the database handler.

Source code in src/fast5_rekindler/bam.py
35
36
37
38
def __init__(self, output_dir: str, num_processes: int):
    """Initializes the database handler."""
    self.output_dir = output_dir
    self.num_processes = num_processes

exit_func(worker_id: int, worker_state: Dict[str, Any]) -> None

Closes the database connection for each worker.

Source code in src/fast5_rekindler/bam.py
48
49
50
51
52
53
54
def exit_func(self, worker_id: int, worker_state: Dict[str, Any]) -> None:
    """Closes the database connection for each worker."""
    conn = worker_state["db_connection"]
    cursor = worker_state["db_cursor"]
    conn.commit()
    cursor.close()
    conn.close()

init_func(worker_id: int, worker_state: Dict[str, Any]) -> None

Initializes the database for each worker.

Source code in src/fast5_rekindler/bam.py
40
41
42
43
44
45
46
def init_func(self, worker_id: int, worker_state: Dict[str, Any]) -> None:
    """Initializes the database for each worker."""
    unique_db_path = os.path.join(self.output_dir, f"tmp_worker_{worker_id}.db")
    worker_state["db_connection"] = sqlite3.connect(unique_db_path)
    worker_state["db_cursor"] = worker_state["db_connection"].cursor()
    worker_state["db_cursor"].execute(TABLE_INIT_QUERY)
    worker_state["db_connection"].commit()

merge_databases() -> None

Merges the databases from each worker into a single database.

Source code in src/fast5_rekindler/bam.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def merge_databases(self) -> None:
    """Merges the databases from each worker into a single database."""
    db_path = os.path.join(self.output_dir, "bam_db.db")
    main_conn = sqlite3.connect(db_path)
    main_cursor = main_conn.cursor()
    logger.info("Merging databases from each worker into a single database...")
    main_cursor.execute(TABLE_INIT_QUERY)

    try:
        for i in range(self.num_processes):
            worker_db_path = os.path.join(self.output_dir, f"tmp_worker_{i}.db")

            try:
                main_cursor.execute(
                    f"ATTACH DATABASE '{worker_db_path}' AS worker_db"
                )
                main_cursor.execute("BEGIN")

                main_cursor.execute(
                    """
                    INSERT OR IGNORE INTO bam_db
                    SELECT * FROM worker_db.bam_db
                    """
                )

                main_cursor.execute("COMMIT")

            except Exception:
                logger.warning(
                    "May be an empty database # {}. Nothing to worry about.",
                    i,
                )

            finally:
                main_cursor.execute("DETACH DATABASE worker_db")
                os.remove(worker_db_path)

    finally:
        main_cursor.close()
        main_conn.close()
        logger.info("Merging databases complete.")

build_bam_db(bam_filepath: str, output_dir: str, num_processes: int) -> None

Builds a database from a BAM file.

Parameters:

Name Type Description Default
bam_filepath str

Path to the BAM file.

required
output_dir str

Path to the output directory.

required
num_processes int

Number of processes to use.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/bam.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def build_bam_db(bam_filepath: str, output_dir: str, num_processes: int) -> None:
    """Builds a database from a BAM file.

    Params:
        bam_filepath (str): Path to the BAM file.
        output_dir (str): Path to the output directory.
        num_processes (int): Number of processes to use.

    Returns:
        None
    """
    num_bam_records = get_total_records(bam_filepath)
    db_handler = DatabaseHandler(output_dir, num_processes)
    with WorkerPool(
        n_jobs=num_processes, use_worker_state=True, pass_worker_id=True
    ) as pool:
        pool.map(
            insert_bamdata_in_db_worker,
            [(bam_data,) for bam_data in process_bam_records(bam_filepath)],
            iterable_len=num_bam_records,
            worker_init=db_handler.init_func,
            worker_exit=db_handler.exit_func,
            progress_bar=True,
            progress_bar_options={"desc": "", "unit": "records", "colour": "green"},
        )
    db_handler.merge_databases()

get_signal_info(record: pysam.AlignedSegment) -> Dict[str, Any]

Returns a dictionary containing the signal information from a BAM record.

Parameters:

Name Type Description Default
record AlignedSegment

A BAM record.

required

Returns:

Name Type Description
signal_info Dict[str, Any]

A dictionary containing the signal information.

Source code in src/fast5_rekindler/bam.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def get_signal_info(record: pysam.AlignedSegment) -> Dict[str, Any]:
    """Returns a dictionary containing the signal information from a BAM record.

    Params:
        record (pysam.AlignedSegment): A BAM record.

    Returns:
        signal_info (Dict[str, Any]): A dictionary containing the signal information.
    """
    signal_info = {}
    tags_dict = dict(record.tags)  # type: ignore
    try:
        signal_info["moves_table"] = tags_dict["mv"]
    except KeyError:
        logger.exception(
            "The BAM file does not contain moves information. Please use --emit-moves option when basecalling using Doardo."
        )
        raise SystemExit
    signal_info["moves_step"] = signal_info["moves_table"].pop(0)
    signal_info["read_id"] = record.query_name
    signal_info["start_sample"] = tags_dict["ts"]
    signal_info["num_samples"] = tags_dict["ns"]
    signal_info["quality_score"] = tags_dict["qs"]
    signal_info["channel"] = tags_dict["ch"]
    signal_info["signal_mean"] = tags_dict["sm"]
    signal_info["signal_sd"] = tags_dict["sd"]
    signal_info["is_qcfail"] = record.is_qcfail
    signal_info["is_reverse"] = record.is_reverse
    signal_info["is_forward"] = record.is_forward  # type: ignore
    signal_info["is_mapped"] = record.is_mapped  # type: ignore
    signal_info["is_supplementary"] = record.is_supplementary
    signal_info["is_secondary"] = record.is_secondary
    signal_info["read_quality"] = record.qual  # type: ignore
    signal_info["read_fasta"] = record.query_sequence
    signal_info["mapping_quality"] = record.mapping_quality
    signal_info["parent_read_id"] = tags_dict.get("pi", "")
    signal_info["split_point"] = tags_dict.get("sp", 0)
    signal_info["time_stamp"] = tags_dict.get("st")

    return signal_info

get_total_records(bam_filepath: str) -> int

Returns the total number of records in a BAM file.

Parameters:

Name Type Description Default
bam_filepath str

Path to the BAM file.

required

Returns:

Name Type Description
total_records int

Total number of records in the BAM file.

Source code in src/fast5_rekindler/bam.py
 99
100
101
102
103
104
105
106
107
108
109
110
def get_total_records(bam_filepath: str) -> int:
    """Returns the total number of records in a BAM file.

    Params:
        bam_filepath (str): Path to the BAM file.

    Returns:
        total_records (int): Total number of records in the BAM file.
    """
    with pysam.AlignmentFile(bam_filepath) as bam_file:
        total_records = sum(1 for _ in bam_file)
    return total_records

insert_bamdata_in_db_worker(worker_id: int, worker_state: Dict[str, Any], bam_data: Dict[str, Any]) -> None

Inserts the signal information from a BAM record into the BAM database.

Parameters:

Name Type Description Default
worker_id int

Worker ID.

required
worker_state Dict[str, Any]

Worker state.

required
bam_data Dict[str, Any]

A dictionary containing the signal information.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/bam.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def insert_bamdata_in_db_worker(
    worker_id: int, worker_state: Dict[str, Any], bam_data: Dict[str, Any]
) -> None:
    """Inserts the signal information from a BAM record into the BAM database.

    Params:
        worker_id (int): Worker ID.
        worker_state (Dict[str, Any]): Worker state.
        bam_data (Dict[str, Any]): A dictionary containing the signal information.

    Returns:
        None
    """
    cursor = worker_state["db_cursor"]
    if bam_data.get("is_secondary"):
        # Skip supplementary alignments as they do not contain FASTA information
        # The FASTA information is only present in the primary alignment
        # and is picked up from these record of the primary alignment
        return

    try:
        # Extract the required attributes from the BAM data dictionary
        block_stride = bam_data.get("moves_step")
        moves_table = bam_data.get("moves_table")
        called_events = len(moves_table) if moves_table is not None else 0
        mean_qscore = bam_data.get("quality_score")
        read_fasta = bam_data.get("read_fasta")
        sequence_length = len(read_fasta) if read_fasta is not None else 0
        duration_template = bam_data.get("num_samples")
        first_sample_template = bam_data.get("start_sample")
        num_events_template = called_events
        moves_table = bam_data.get("moves_table").tobytes()  # type: ignore
        read_fasta = bam_data.get("read_fasta")
        read_quality = bam_data.get("read_quality")
        read_id = bam_data.get("read_id")
        parent_read_id = bam_data.get("parent_read_id")
        split_point = bam_data.get("split_point")
        time_stamp = bam_data.get("time_stamp")

        insert_query = """INSERT OR IGNORE INTO bam_db (
                            block_stride,
                            called_events,
                            mean_qscore,
                            sequence_length,
                            duration_template,
                            first_sample_template,
                            num_events_template,
                            moves_table,
                            read_fasta,
                            read_quality,
                            read_id,
                            parent_read_id,
                            split_point,
                            time_stamp
                        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"""

        cursor.execute(
            insert_query,
            (
                block_stride,
                called_events,
                mean_qscore,
                sequence_length,
                duration_template,
                first_sample_template,
                num_events_template,
                moves_table,
                read_fasta,
                read_quality,
                read_id,
                parent_read_id,
                split_point,
                time_stamp,
            ),
        )
    except Exception:
        logger.exception(
            "An error occurred in worker {}, read_id: {}", worker_id, read_id
        )

process_bam_records(bam_filepath: str) -> Generator[Dict[str, Any], None, None]

Yields records from a BAM file using pysam one-by-one and extracts the signal information from each of the record.

Parameters:

Name Type Description Default
bam_filepath str

Path to the BAM file.

required

Yields:

Type Description
Dict[str, Any]

signal_info Generator[Dict[str, Any], None, None]: A dictionary containing the signal information.

Source code in src/fast5_rekindler/bam.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def process_bam_records(bam_filepath: str) -> Generator[Dict[str, Any], None, None]:
    """Yields records from a BAM file using pysam one-by-one and
    extracts the signal information from each of the record.

    Params:
        bam_filepath (str): Path to the BAM file.

    Yields:
        signal_info Generator[Dict[str, Any], None, None]: A dictionary containing the signal information.

    """
    index_filepath = f"{bam_filepath}.bai"

    if not os.path.exists(index_filepath):
        pysam.index(bam_filepath)  # type: ignore

    with pysam.AlignmentFile(bam_filepath, "rb") as bam_file:
        for record in bam_file:
            yield get_signal_info(record)

cleanup

cleanup_fast5(fast5_filepath: str) -> None

Delete configuration groups from a FAST5 file. These groups are added automatically by ont-fast5-api but are not needed in rekindled FAST5 files.

Parameters:

Name Type Description Default
fast5_filepath str

Path to a FAST5 file.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/cleanup.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def cleanup_fast5(fast5_filepath: str) -> None:
    """
    Delete configuration groups from a FAST5 file.
    These groups are added automatically by ont-fast5-api
    but are not needed in rekindled FAST5 files.

    Params:
        fast5_filepath (str): Path to a FAST5 file.

    Returns:
        None
    """
    with h5py.File(fast5_filepath, "r+") as file:
        delete_configuration_groups(file)

delete_configuration_groups(group: Any) -> None

Recursively delete groups named 'Configuration'.

Parameters:

Name Type Description Default
group Group

A group object.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/cleanup.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def delete_configuration_groups(group: Any) -> None:
    """
    Recursively delete groups named 'Configuration'.

    Params:
        group (h5py.Group): A group object.

    Returns:
        None
    """
    # Make a list of keys to iterate over to avoid RuntimeError for changing size during iteration
    keys = list(group.keys())
    for key in keys:
        item = group[key]
        if isinstance(item, h5py.Group):
            # If the item is a group and its name is 'Configuration', delete it
            if key == "Configuration":
                del group[key]
            else:
                # Otherwise, recursively check its subgroups
                delete_configuration_groups(item)

cli

fast5_rekindler() -> None

CLI entry point for rekindle_fast5 package.

Source code in src/fast5_rekindler/cli.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def fast5_rekindler() -> None:
    """CLI entry point for rekindle_fast5 package."""
    app_version = get_version()

    # Use RawTextHelpFormatter to preserve newlines
    parser = argparse.ArgumentParser(
        description="Creates legacy basecalled FAST5 files by collating information in BAM and POD5 files.",
        formatter_class=argparse.RawTextHelpFormatter,
        epilog="""Example usage:
fast5_rekindler /path/to/calls.bam /path/to/pod5_dir /path/to/output_dir -n 4
        """,
    )

    # Positional arguments
    parser.add_argument(
        "bam_filepath",
        type=str,
        help=(
            "Path to the BAM file. \n"
            "This file must contain moves information. \n"
            "To add moves info to the BAM file, \n"
            "use --emit-moves option when basecalling using Doardo."
        ),
    )
    parser.add_argument("pod5_dir", type=str, help="Directory containing POD5 files.")
    parser.add_argument(
        "output_dir",
        type=str,
        help="Directory where the output FAST5 files will be saved.",
    )

    # Optional arguments (options)
    parser.add_argument(
        "-n",
        "--num-processes",
        type=int,
        default=4,
        metavar="",
        help="Number of processes to use. Default is 4.",
    )

    # Version argument
    parser.add_argument(
        "-v",
        "--version",
        action="version",
        version=f"%(prog)s {app_version}",
        help="Show program's version number and exit",
    )  # Replace 1.0.0 with your actual version

    args = parser.parse_args()

    # Initialize the logger
    log_filepath = configure_logger(new_log_directory=args.output_dir)

    width = 15
    full_command = " ".join(sys.argv)
    logger.info("You have issued the following command: {}\n", full_command)

    logger.info(
        """You have configured the following options:
        {} {}
        {} {}
        {} {}
        {} {}
        {} {}, \n""",
        "• BAM filepath:".ljust(width),
        args.bam_filepath,
        "• POD5 dir:".ljust(width),
        args.pod5_dir,
        "• Output dir:".ljust(width),
        args.output_dir,
        "• Num CPUs:".ljust(width),
        args.num_processes,
        "• Log file:".ljust(width),
        log_filepath,
    )

    # Call the function with parsed arguments
    collate_bam_fast5(
        args.bam_filepath, args.pod5_dir, args.output_dir, args.num_processes
    )

collate

DatabaseHandler

Class that handles database connections for each worker. Each worker can do READ operations in parallel for fetching BAM information for a given read_id.

Source code in src/fast5_rekindler/collate.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class DatabaseHandler:
    """Class that handles database connections for each worker.
    Each worker can do READ operations in parallel for fetching
    BAM information for a given read_id.
    """

    def __init__(self, database_path: str) -> None:
        """Initializes the database path.

        Params:
            database_path (str): Path to the database file.
        """
        self.database_path = database_path

    def open_database(self) -> Tuple[sqlite3.Cursor, sqlite3.Connection]:
        """Connect to SQLite database (or create it if it doesn't exist)"""
        conn = sqlite3.connect(self.database_path)
        cursor = conn.cursor()
        return cursor, conn

    def init_func(self, worker_state: Dict[str, Any]) -> None:
        """Initializes the database connection for each worker."""
        worker_state["db_cursor"], worker_state["db_connection"] = self.open_database()

    def exit_func(self, worker_state: Dict[str, Any]) -> None:
        """Closes the database connection for each worker."""
        conn = worker_state["db_connection"]
        cursor = worker_state["db_cursor"]
        cursor.close()
        conn.close()

__init__(database_path: str) -> None

Initializes the database path.

Parameters:

Name Type Description Default
database_path str

Path to the database file.

required
Source code in src/fast5_rekindler/collate.py
24
25
26
27
28
29
30
def __init__(self, database_path: str) -> None:
    """Initializes the database path.

    Params:
        database_path (str): Path to the database file.
    """
    self.database_path = database_path

exit_func(worker_state: Dict[str, Any]) -> None

Closes the database connection for each worker.

Source code in src/fast5_rekindler/collate.py
42
43
44
45
46
47
def exit_func(self, worker_state: Dict[str, Any]) -> None:
    """Closes the database connection for each worker."""
    conn = worker_state["db_connection"]
    cursor = worker_state["db_cursor"]
    cursor.close()
    conn.close()

init_func(worker_state: Dict[str, Any]) -> None

Initializes the database connection for each worker.

Source code in src/fast5_rekindler/collate.py
38
39
40
def init_func(self, worker_state: Dict[str, Any]) -> None:
    """Initializes the database connection for each worker."""
    worker_state["db_cursor"], worker_state["db_connection"] = self.open_database()

open_database() -> Tuple[sqlite3.Cursor, sqlite3.Connection]

Connect to SQLite database (or create it if it doesn't exist)

Source code in src/fast5_rekindler/collate.py
32
33
34
35
36
def open_database(self) -> Tuple[sqlite3.Cursor, sqlite3.Connection]:
    """Connect to SQLite database (or create it if it doesn't exist)"""
    conn = sqlite3.connect(self.database_path)
    cursor = conn.cursor()
    return cursor, conn

add_info_to_fast5_read(f5_read: Any, data: Dict[str, Any], read_id: str) -> None

Adds basecalling information the raw FAST5 read.

Parameters:

Name Type Description Default
f5_read Any

Raw FAST5 read object.

required
data Dict[str, Any]

Dictionary containing basecalling information.

required
read_id str

Read ID.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/collate.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def add_info_to_fast5_read(f5_read: Any, data: Dict[str, Any], read_id: str) -> None:
    """Adds basecalling information the raw FAST5 read.

    Params:
        f5_read (Any): Raw FAST5 read object.
        data (Dict[str, Any]): Dictionary containing basecalling information.
        read_id (str): Read ID.

    Returns:
        None
    """
    f5_read.add_analysis(
        component="basecall_1d",
        group_name="Basecall_1D_000",
        attrs={
            "model_type": "flipflop",
            "name": "ONT Guppy basecalling software.",
            "segmentation": "Segmentation_000",
            "time_stamp": data.get("time_stamp"),
            "version": "Unknow",
        },
        config=None,
    )
    f5_read.add_analysis_subgroup(
        group_name="Basecall_1D_000", subgroup_name="BaseCalled_template"
    )
    moves_array = np.frombuffer(data["moves_table"], dtype="uint8")
    f5_read.add_analysis_dataset(
        group_name="Basecall_1D_000/BaseCalled_template",
        dataset_name="Move",
        data=moves_array,
        attrs=None,
    )
    fastq = (
        "@"
        + read_id
        + "\n"
        + data["read_fasta"]
        + "\n"
        + "+"
        + "\n"
        + data["read_quality"]
    )
    f5_read.add_analysis_dataset(
        group_name="Basecall_1D_000/BaseCalled_template",
        dataset_name="Fastq",
        data=fastq,
        attrs=None,
    )
    f5_read.add_analysis_subgroup(
        group_name="Basecall_1D_000/Summary",
        subgroup_name="basecall_1d_template",
        attrs={
            "block_stride": data["block_stride"],
            "called_events": data["called_events"],
            "mean_qscore": data["mean_qscore"],
            "scaling_mean_abs_dev": 0,
            "scaling_median": 0,
            "sequence_length": data["sequence_length"],
            "skip_prob": 0,
            "stay_prob": 0,
            "step_prob": 0,
            "strand_score": 0,
        },
    )
    f5_read.add_analysis(
        component="segmentation",
        group_name="Segmentation_000",
        attrs={
            "name": "ONT Guppy basecalling software.",
            "time_stamp": "Unknown",
            "version": "Unknown",
        },
    )
    f5_read.add_analysis_subgroup(
        group_name="Segmentation_000/Summary",
        subgroup_name="segmentation",
        attrs={
            "adapter_max": 0,
            "duration_template": data["duration_template"],
            "first_sample_template": data["first_sample_template"],
            "has_complement": 0,
            "has_template": 0,
            "med_abs_dev_template": 0,
            "median_template": 0,
            "num_events_template": data["num_events_template"],
            "pt_detect_success": 0,
            "pt_median": 0,
            "pt_sd": 0,
            "parent_read_id": data["parent_read_id"],
        },
    )

collate_bam_fast5(bam_filepath: str, pod5_dir: str, output_dir: str, num_processes: int) -> None

Main function that combines information from BAM and all POD5 files to create legacy basecalled FAST5 files.

Parameters:

Name Type Description Default
bam_filepath str

Path to the BAM file. This file must contain moves information.

required
pod5_dir str

Path to the directory containing POD5 files.

required
output_dir str

Path to the directory where the output FAST5 files will be saved. N.B.: The POD5 directory structure will be replicated in the output directory.

required
num_processes int

Number of processes to use.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/collate.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
def collate_bam_fast5(
    bam_filepath: str, pod5_dir: str, output_dir: str, num_processes: int
) -> None:
    """Main function that combines information from BAM and all POD5 files to create
    legacy basecalled FAST5 files.

    Params:
        bam_filepath (str): Path to the BAM file. This file must contain moves information.
        pod5_dir (str): Path to the directory containing POD5 files.
        output_dir (str): Path to the directory where the output FAST5 files will be saved.
                          N.B.: The POD5 directory structure will be replicated in
                          the output directory.
        num_processes (int): Number of processes to use.

    Returns:
        None
    """
    # 1. Convert POD5 to FAST5
    logger.info("Step 1/4: Converting POD5 files to raw FAST5 files")
    convert_pod5_to_raw_fast5(pod5_dir, output_dir, num_processes)

    # 2. Make index database
    logger.info("Step 2/4: Building an index database")
    build_index_db(
        fast5_dir=output_dir, output_dir=output_dir, num_processes=num_processes
    )

    # 3. Make BAM database
    logger.info("Step 3/4: Reading BAM file info to the database")
    build_bam_db(bam_filepath, output_dir, num_processes)

    # 4. Join databases
    database_path = join_databases(output_dir)

    # 5. Make a list of all FAST5 files
    logger.info("Step 4/4: Writing basecalling information to FAST5 files")
    fast5_filepaths_list = make_fast5_files_list(output_dir)

    db_handler = DatabaseHandler(database_path)

    num_fast_files = len(fast5_filepaths_list)
    with WorkerPool(
        n_jobs=min(num_processes, num_fast_files), use_worker_state=True
    ) as pool:
        pool.map(
            collate_bam_fast5_worker,
            fast5_filepaths_list,
            iterable_len=num_fast_files,
            worker_init=db_handler.init_func,  # Passing method of db_handler object
            worker_exit=db_handler.exit_func,  # Passing method of db_handler object
            progress_bar=True,
            progress_bar_options={"desc": "", "unit": "files", "colour": "green"},
        )

    os.remove(db_handler.database_path)
    logger.success("Finished successfully!")

collate_bam_fast5_worker(worker_state: Dict[str, Any], fast5_filepath: str) -> None

Worker function that fetches information from BAM file and writes it to a raw FAST5 file in order to convert it into a basecalled FAST5 file. Unfortunately, some information such as the trace table can never be recreated due to a change in the basecaller design.

Parameters:

Name Type Description Default
worker_state Dict[str, Any]

Dictionary containing the database connection and cursor.

required
fast5_filepath str

Path to the raw FAST5 file.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/collate.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def collate_bam_fast5_worker(worker_state: Dict[str, Any], fast5_filepath: str) -> None:
    """Worker function that fetches information from BAM file and writes it to a raw
    FAST5 file in order to convert it into a basecalled FAST5 file. Unfortunately, some
    information such as the trace table can never be recreated due to a change in the
    basecaller design.

    Params:
        worker_state (Dict[str, Any]): Dictionary containing the database connection and cursor.
        fast5_filepath (str): Path to the raw FAST5 file.

    Returns:
        None
    """
    # First create extra reads for duplex/chimeric read that are split
    # into two reads in the BAM file
    create_duplex_reads_in_fast5(worker_state, fast5_filepath)

    # Delete reads that are not in the BAM file
    delete_nonexistant_reads_from_fast5(worker_state, fast5_filepath)

    # Handle all the simplex/unfused reads
    mf5 = MultiFast5File(fast5_filepath, mode="r+")
    read_ids_list = mf5.get_read_ids()

    for read_id in read_ids_list:
        f5_read = mf5.get_read(read_id)
        try:
            data = extract_simplex_data_from_db(
                read_id,
                "read_id",
                worker_state=worker_state,
                fetch_multiple=False,
            )
            if isinstance(data, dict):
                add_info_to_fast5_read(f5_read, data, read_id)
            else:
                logger.error("Data for read_id {} is not in expected format", read_id)
        except Exception:
            logger.debug("Error in read_id {} in file {}.", read_id, fast5_filepath)
    mf5.close()

    # Delete configuration groups
    cleanup_fast5(fast5_filepath)

    # For duplex/chimeric reads, split the original raw signal properly between the two reads
    # The splitted raw signal is not VBZ compressed
    split_raw_signal_for_duplex_reads(worker_state, fast5_filepath)

copy_group(source_group: Any, target_group: Any) -> None

Recursively copies a group and its contents to a target group in an HDF5 file.

Parameters:

Name Type Description Default
source_group Any

HDF5 source group.

required
target_group Any

HDF5 arget group.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/collate.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def copy_group(source_group: Any, target_group: Any) -> None:
    """Recursively copies a group and its contents to a target group in an HDF5 file.

    Params:
        source_group (Any): HDF5 source group.
        target_group (Any): HDF5 arget group.

    Returns:
        None
    """
    # Copy attributes from the source group to the target group
    for attr_name, attr_value in source_group.attrs.items():
        if attr_name == "read_id":
            attr_value = target_group.name.split("_")[-1]
        target_group.attrs[attr_name] = attr_value

    # Recursively copy subgroups and datasets from the source group to the target group
    for name, obj in source_group.items():
        if isinstance(obj, h5py.Group):
            new_subgroup = target_group.create_group(name)
            copy_group(obj, new_subgroup)
        elif isinstance(obj, h5py.Dataset):
            # Copy the dataset to the target group
            source_group.copy(obj, target_group, name=name)

create_duplex_reads_in_fast5(worker_state: Dict[str, Any], fast5_filepath: str) -> None

Creates extra reads for duplex/chimeric reads that are split into two reads in the BAM file. It copies all the information in the parent read group and renames it to the child read group.

Parameters:

Name Type Description Default
worker_state Dict[str, Any]

Dictionary containing the database connection and cursor.

required
fast5_filepath str

Path to the raw FAST5 file.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/collate.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def create_duplex_reads_in_fast5(
    worker_state: Dict[str, Any], fast5_filepath: str
) -> None:
    """Creates extra reads for duplex/chimeric reads that are split into two reads in the BAM file.
    It copies all the information in the parent read group and renames it to the child read group.

    Params:
        worker_state (Dict[str, Any]): Dictionary containing the database connection and cursor.
        fast5_filepath (str): Path to the raw FAST5 file.

    Returns:
        None
    """
    data = extract_read_data_from_db_based_on_action(
        fast5_filepath, action="duplicate", worker_state=worker_state
    )

    if data is None:
        return

    for record in data:
        read_id = record.get("read_id")
        parent_read_id = record.get("parent_read_id")
        original_group_name = f"/read_{parent_read_id}"
        new_group_name = f"/read_{read_id}"
        duplicate_and_rename_group(fast5_filepath, original_group_name, new_group_name)

delete_nonexistant_reads_from_fast5(worker_state: Dict[str, Any], fast5_filepath: str) -> None

Deletes reads that are

a. Not in the BAM file but have no corresponding record in FAST5 file b. Duplex parent reads for which with no corresponding record in BAM file

Parameters:

Name Type Description Default
worker_state Dict[str, Any]

Dictionary containing the database connection and cursor.

required
fast5_filepath str

Path to the raw FAST5 file.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/collate.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def delete_nonexistant_reads_from_fast5(
    worker_state: Dict[str, Any], fast5_filepath: str
) -> None:
    """Deletes reads that are:
        a. Not in the BAM file but have no corresponding record in FAST5 file
        b. Duplex parent reads for which with no corresponding record in BAM file

    Params:
        worker_state (Dict[str, Any]): Dictionary containing the database connection and cursor.
        fast5_filepath (str): Path to the raw FAST5 file.

    Returns:
        None
    """
    data_del = extract_read_data_from_db_based_on_action(
        fast5_filepath, action="delete", worker_state=worker_state
    )

    # Check if combined_data is still empty
    if not data_del:
        return

    with h5py.File(fast5_filepath, "r+") as file:
        for record in data_del:
            read_id = record.get("read_id")
            group_name = f"/read_{read_id}"
            # Use the `del` statement to delete the group
            del file[group_name]

duplicate_and_rename_group(input_file: str, original_group_name: str, new_group_name: str) -> None

Duplicates a read group in an HDF5 file and renames it.

Parameters:

Name Type Description Default
input_file str

Path to the HDF5 file.

required
original_group_name str

Name of the original group.

required
new_group_name str

Name of the new group.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/collate.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def duplicate_and_rename_group(
    input_file: str, original_group_name: str, new_group_name: str
) -> None:
    """Duplicates a read group in an HDF5 file and renames it.

    Params:
        input_file (str): Path to the HDF5 file.
        original_group_name (str): Name of the original group.
        new_group_name (str): Name of the new group.

    Returns:
        None
    """
    with h5py.File(input_file, "r+") as file:
        # Check if the original group exists
        if original_group_name not in file:
            print(f"The group '{original_group_name}' does not exist in the HDF5 file.")
            return

        # Read the original group
        original_group = file[original_group_name]

        # Create a new group with the desired new name
        new_group = file.create_group(new_group_name)

        # Copy contents of the original group to the new group
        copy_group(original_group, new_group)

extract_read_data_from_db_based_on_action(fast5_filepath: str, action: str, worker_state: Dict[str, Any]) -> Optional[List[Dict[str, Any]]]

Extracts information from the database for a given FAST5 file based on a given action.

Parameters:

Name Type Description Default
fast5_filepath str

Path to the raw FAST5 file.

required
action str

Action to query ('duplicate' or 'delete').

required
worker_state Dict[str, Any]

Dictionary containing the database connection and cursor.

required

Returns:

Type Description
Optional[List[Dict[str, Any]]]

Optional[List[Dict[str, Any]]]: List of dictionaries containing BAM record information from the database.

Source code in src/fast5_rekindler/collate.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def extract_read_data_from_db_based_on_action(
    fast5_filepath: str, action: str, worker_state: Dict[str, Any]
) -> Optional[List[Dict[str, Any]]]:
    """
    Extracts information from the database for a given FAST5 file
    based on a given action.

    Params:
        fast5_filepath (str): Path to the raw FAST5 file.
        action (str): Action to query ('duplicate' or 'delete').
        worker_state (Dict[str, Any]): Dictionary containing the database connection and cursor.

    Returns:
        Optional[List[Dict[str, Any]]]: List of dictionaries containing BAM record information from the database.
    """
    cursor = worker_state.get("db_cursor")
    assert cursor is not None, "Database cursor is unexpectedly None"

    query = """SELECT
                    read_id,
                    parent_read_id,
                    fast5_filepath,
                    block_stride,
                    called_events,
                    mean_qscore,
                    sequence_length,
                    duration_template,
                    first_sample_template,
                    num_events_template,
                    moves_table,
                    read_fasta,
                    read_quality,
                    action,
                    split_point
                FROM fast5_bam_db
                WHERE fast5_filepath = ? AND action = ?
            """

    # Execute the query with the provided ID value
    cursor.execute(query, (fast5_filepath, action))

    # Define the columns
    columns = [
        "read_id",
        "parent_read_id",
        "fast5_filepath",
        "block_stride",
        "called_events",
        "mean_qscore",
        "sequence_length",
        "duration_template",
        "first_sample_template",
        "num_events_template",
        "moves_table",
        "read_fasta",
        "read_quality",
        "action",
        "split_point",
    ]

    rows = cursor.fetchall()

    if not rows:
        return None

    return [dict(zip(columns, row)) for row in rows]

extract_simplex_data_from_db(id_value: str, id_type: str, worker_state: Dict[str, Any], fetch_multiple: bool = False) -> Union[Dict[str, Any], List[Dict[str, Any]], None]

Extracts information from the database for a given read ID or parent read ID.

Parameters:

Name Type Description Default
id_value str

Value of the read ID or parent read ID.

required
id_type str

Type of ID to query ('read_id' or 'parent_read_id').

required
worker_state Dict[str, Any]

Dictionary containing the database connection and cursor.

required
fetch_multiple bool

Flag to fetch multiple records or a single record.

False

Returns:

Type Description
Union[Dict[str, Any], List[Dict[str, Any]], None]

Union[Optional[Dict[str, Any]], Generator[Dict[str, Any], None, None]]:

Union[Dict[str, Any], List[Dict[str, Any]], None]
  • Dictionary containing BAM record information from the database, or None if no record is found.
Union[Dict[str, Any], List[Dict[str, Any]], None]
  • A generator that yields dictionaries containing BAM record information from the database.
Source code in src/fast5_rekindler/collate.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def extract_simplex_data_from_db(
    id_value: str,
    id_type: str,
    worker_state: Dict[str, Any],
    fetch_multiple: bool = False,
) -> Union[Dict[str, Any], List[Dict[str, Any]], None]:
    """
    Extracts information from the database for a given read ID or parent read ID.

    Params:
        id_value (str): Value of the read ID or parent read ID.
        id_type (str): Type of ID to query ('read_id' or 'parent_read_id').
        worker_state (Dict[str, Any]): Dictionary containing the database connection and cursor.
        fetch_multiple (bool): Flag to fetch multiple records or a single record.

    Returns:
        Union[Optional[Dict[str, Any]], Generator[Dict[str, Any], None, None]]:
        - Dictionary containing BAM record information from the database, or None if no record is found.
        - A generator that yields dictionaries containing BAM record information from the database.
    """
    cursor = worker_state.get("db_cursor")
    assert cursor is not None, "Database cursor is unexpectedly None"

    query = f"""SELECT
                    read_id,
                    parent_read_id,
                    fast5_filepath,
                    block_stride,
                    called_events,
                    mean_qscore,
                    sequence_length,
                    duration_template,
                    first_sample_template,
                    num_events_template,
                    moves_table,
                    read_fasta,
                    read_quality,
                    action,
                    time_stamp
                FROM fast5_bam_db
                WHERE {id_type} = ?"""

    # Execute the query with the provided ID value
    cursor.execute(query, (id_value,))

    # Define the columns
    columns = [
        "read_id",
        "parent_read_id",
        "fast5_filepath",
        "block_stride",
        "called_events",
        "mean_qscore",
        "sequence_length",
        "duration_template",
        "first_sample_template",
        "num_events_template",
        "moves_table",
        "read_fasta",
        "read_quality",
        "action",
        "time_stamp",
    ]

    if fetch_multiple:
        # Fetch and yield the results one by one
        return [dict(zip(columns, row)) for row in cursor.fetchall()]

    else:
        # Fetch the first result
        row = cursor.fetchone()

        # If a row is fetched, convert it to a dictionary
        if row:
            return dict(zip(columns, row))
        else:
            return None  # No record found for the given ID

make_fast5_files_list(directory: str) -> List[str]

Makes a list of all FAST5 files in a directory tree.

Parameters:

Name Type Description Default
directory str

Path to the directory containing generated raw FAST5 files.

required

Returns:

Type Description
List[str]

List[str]: List of all FAST5 file paths.

Source code in src/fast5_rekindler/collate.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def make_fast5_files_list(directory: str) -> List[str]:
    """Makes a list of all FAST5 files in a directory tree.

    Params:
        directory (str): Path to the directory containing generated raw FAST5 files.

    Returns:
        List[str]: List of all FAST5 file paths.

    """
    fast5_files = []
    # os.walk generates the file names in a directory tree
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".fast5"):
                # os.path.join constructs a full file path
                full_path = os.path.join(root, file)
                fast5_files.append(full_path)
    return fast5_files

split_hdf5_signal_array(fileobj: Any, dataset_name: str, start_idx: int, end_idx: int) -> None

Splits the raw signal array of a read into two parts and stores them in a new dataset.

Parameters:

Name Type Description Default
fileobj Any

File object of the HDF5 file.

required
dataset_name str

Name of the dataset containing the raw signal array.

required
start_idx int

Start index of the split.

required
end_idx int

End index of the split.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/collate.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def split_hdf5_signal_array(
    fileobj: Any, dataset_name: str, start_idx: int, end_idx: int
) -> None:
    """
    Splits the raw signal array of a read into two parts and stores them in a new dataset.

    Params:
        fileobj (Any): File object of the HDF5 file.
        dataset_name (str): Name of the dataset containing the raw signal array.
        start_idx (int): Start index of the split.
        end_idx (int): End index of the split.

    Returns:
        None
    """
    # Read the original dataset
    original_dataset = fileobj[dataset_name]

    # Perform the trim operation based on start and end indices
    trimmed_data = original_dataset[start_idx:end_idx]  # +1 to include the end index

    # Delete the original dataset
    del fileobj[dataset_name]

    # Create a new dataset with the same name and store the trimmed data
    fileobj.create_dataset(dataset_name, data=trimmed_data)

split_raw_signal_for_duplex_reads(worker_state: Dict[str, Any], fast5_filepath: str) -> None

Splits the raw signal of a duplex read into its child reads.

Parameters:

Name Type Description Default
worker_state Dict[str, Any]

Dictionary containing the database connection and cursor.

required
fast5_filepath str

Path to the raw FAST5 file.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/collate.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def split_raw_signal_for_duplex_reads(
    worker_state: Dict[str, Any], fast5_filepath: str
) -> None:
    """Splits the raw signal of a duplex read into its child reads.

    Params:
        worker_state (Dict[str, Any]): Dictionary containing the database connection and cursor.
        fast5_filepath (str): Path to the raw FAST5 file.

    Returns:
        None

    """
    data = extract_read_data_from_db_based_on_action(
        fast5_filepath, action="duplicate", worker_state=worker_state
    )
    if data is None:
        return
    with h5py.File(fast5_filepath, "r+") as fileobj:
        for record in data:
            read_id = record.get("read_id")
            dataset_name = f"/read_{read_id}/Raw/Signal"
            split_point = record.get("split_point")
            first_sample_template = record.get("first_sample_template")
            if first_sample_template is None or split_point is None:
                logger.error(
                    "Missing first_sample_template or split_point for read_id: {}",
                    read_id,
                )
                continue
            first_sample_template += split_point

            duration = record.get("duration_template")
            if duration > first_sample_template:
                # This is the first of the chimeric read's child
                end_idx = duration
            else:
                # This is the second of the chimeric read's child
                end_idx = first_sample_template + duration
            # assert that end_idx is integer and greater than first_sample_template
            assert isinstance(end_idx, int) and end_idx > first_sample_template
            split_hdf5_signal_array(
                fileobj, dataset_name, start_idx=first_sample_template, end_idx=end_idx
            )

index

DatabaseHandler

Database handler for multiprocessing.

Source code in src/fast5_rekindler/index.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class DatabaseHandler:
    """Database handler for multiprocessing."""

    def __init__(self, output_dir: str, num_processes: int) -> None:
        self.output_dir = output_dir
        self.num_processes = num_processes

    def init_func(self, worker_id: int, worker_state: Dict[str, Any]) -> None:
        """Initializes the database connection for each worker."""
        unique_db_path = os.path.join(self.output_dir, f"tmp_worker_{worker_id}.db")
        worker_state["db_connection"] = sqlite3.connect(unique_db_path)
        worker_state["db_cursor"] = worker_state["db_connection"].cursor()
        worker_state["db_cursor"].execute(TABLE_INIT_QUERY)
        worker_state["db_connection"].commit()

    def exit_func(self, worker_id: int, worker_state: Dict[str, Any]) -> None:
        """Closes the database connection for each worker."""
        conn = worker_state["db_connection"]
        cursor = worker_state["db_cursor"]
        conn.commit()
        cursor.close()
        conn.close()

    def merge_databases(self) -> None:
        """Merges the databases from each worker into a single database."""
        db_path = os.path.join(self.output_dir, "index_db.db")
        main_conn = sqlite3.connect(db_path)
        main_cursor = main_conn.cursor()
        main_cursor.execute(TABLE_INIT_QUERY)
        for i in range(self.num_processes):
            worker_db_path = os.path.join(self.output_dir, f"tmp_worker_{i}.db")
            main_cursor.execute(f"ATTACH DATABASE '{worker_db_path}' AS worker_db")
            main_cursor.execute("BEGIN")
            main_cursor.execute(
                """
                INSERT OR IGNORE INTO index_db
                SELECT * FROM worker_db.index_db
            """
            )
            main_cursor.execute("COMMIT")
            main_cursor.execute("DETACH DATABASE worker_db")
            os.remove(worker_db_path)
        main_conn.commit()
        main_conn.close()

exit_func(worker_id: int, worker_state: Dict[str, Any]) -> None

Closes the database connection for each worker.

Source code in src/fast5_rekindler/index.py
30
31
32
33
34
35
36
def exit_func(self, worker_id: int, worker_state: Dict[str, Any]) -> None:
    """Closes the database connection for each worker."""
    conn = worker_state["db_connection"]
    cursor = worker_state["db_cursor"]
    conn.commit()
    cursor.close()
    conn.close()

init_func(worker_id: int, worker_state: Dict[str, Any]) -> None

Initializes the database connection for each worker.

Source code in src/fast5_rekindler/index.py
22
23
24
25
26
27
28
def init_func(self, worker_id: int, worker_state: Dict[str, Any]) -> None:
    """Initializes the database connection for each worker."""
    unique_db_path = os.path.join(self.output_dir, f"tmp_worker_{worker_id}.db")
    worker_state["db_connection"] = sqlite3.connect(unique_db_path)
    worker_state["db_cursor"] = worker_state["db_connection"].cursor()
    worker_state["db_cursor"].execute(TABLE_INIT_QUERY)
    worker_state["db_connection"].commit()

merge_databases() -> None

Merges the databases from each worker into a single database.

Source code in src/fast5_rekindler/index.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def merge_databases(self) -> None:
    """Merges the databases from each worker into a single database."""
    db_path = os.path.join(self.output_dir, "index_db.db")
    main_conn = sqlite3.connect(db_path)
    main_cursor = main_conn.cursor()
    main_cursor.execute(TABLE_INIT_QUERY)
    for i in range(self.num_processes):
        worker_db_path = os.path.join(self.output_dir, f"tmp_worker_{i}.db")
        main_cursor.execute(f"ATTACH DATABASE '{worker_db_path}' AS worker_db")
        main_cursor.execute("BEGIN")
        main_cursor.execute(
            """
            INSERT OR IGNORE INTO index_db
            SELECT * FROM worker_db.index_db
        """
        )
        main_cursor.execute("COMMIT")
        main_cursor.execute("DETACH DATABASE worker_db")
        os.remove(worker_db_path)
    main_conn.commit()
    main_conn.close()

build_index_db(fast5_dir: str, output_dir: str, num_processes: int) -> None

Builds an index mapping read_ids to FAST5 file paths.

Parameters:

Name Type Description Default
fast5_dir str

Path to a FAST5 file or directory of FAST5 files.

required
output_dir str

Path to a output directory.

required
num_processes int

Number of processes to use.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/index.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def build_index_db(fast5_dir: str, output_dir: str, num_processes: int) -> None:
    """
    Builds an index mapping read_ids to FAST5 file paths.

    Params:
        fast5_dir (str): Path to a FAST5 file or directory of FAST5 files.
        output_dir (str): Path to a output directory.
        num_processes (int): Number of processes to use.

    Returns:
        None
    """
    # Ensure output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    total_files = sum(1 for _ in generate_fast5_file_paths(fast5_dir))

    num_processes = min(num_processes, total_files)
    db_handler = DatabaseHandler(output_dir, num_processes)

    with WorkerPool(
        n_jobs=num_processes, use_worker_state=True, pass_worker_id=True
    ) as pool:
        pool.map(
            build_index_db_worker,
            [
                (fast5_filepath,)
                for fast5_filepath in generate_fast5_file_paths(fast5_dir)
            ],
            iterable_len=total_files,
            worker_init=db_handler.init_func,
            worker_exit=db_handler.exit_func,
            progress_bar=True,
            progress_bar_options={"desc": "", "unit": "files", "colour": "green"},
        )
    db_handler.merge_databases()

build_index_db_worker(worker_id: int, worker_state: Dict[str, Any], fast5_filepath: str) -> None

Builds an index mapping read_ids to FAST5 file paths. Every worker has its own database.

Parameters:

Name Type Description Default
fast5_filepath str

Path to a FAST5 file.

required
worker_id int

Worker ID.

required
worker_state Dict[str, Any]

Worker state dictionary.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/index.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def build_index_db_worker(
    worker_id: int, worker_state: Dict[str, Any], fast5_filepath: str
) -> None:
    """
    Builds an index mapping read_ids to FAST5 file paths. Every worker has its own database.

    Params:
        fast5_filepath (str): Path to a FAST5 file.
        worker_id (int): Worker ID.
        worker_state (Dict[str, Any]): Worker state dictionary.

    Returns:
        None
    """
    read_ids = get_readids(fast5_filepath)
    data = prepare_data(read_ids, fast5_filepath)
    write_database(data, worker_state["db_cursor"], worker_state["db_connection"])

find_database_size(output_dir: str) -> Any

Find the number of records in the database.

Parameters:

Name Type Description Default
output_dir str

Path to the database directory.

required

Returns:

Name Type Description
size Any

Number of records in the database.

Source code in src/fast5_rekindler/index.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def find_database_size(output_dir: str) -> Any:
    """
    Find the number of records in the database.

    Params:
        output_dir (str): Path to the database directory.

    Returns:
        size (Any): Number of records in the database.
    """
    database_path = os.path.join(output_dir, "index_db.db")
    conn = sqlite3.connect(database_path)
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM index_db")
    size = cursor.fetchone()[0]
    return size

generate_fast5_file_paths(fast5_dir: str) -> Generator

Traverse the directory and yield all the fast5 file paths.

Parameters:

Name Type Description Default
fast5_dir str

Path to a FAST5 file or directory of FAST5 files.

required

Returns:

Name Type Description
pod5_filepath str

Path to a FAST5 file.

Source code in src/fast5_rekindler/index.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def generate_fast5_file_paths(fast5_dir: str) -> Generator:
    """Traverse the directory and yield all the fast5 file paths.

    Params:
        fast5_dir (str): Path to a FAST5 file or directory of FAST5 files.

    Returns:
        pod5_filepath (str): Path to a FAST5 file.
    """

    if os.path.isdir(fast5_dir):
        for root, _, files in os.walk(fast5_dir):
            for file in files:
                if file.endswith(".fast5"):
                    yield os.path.join(root, file)
    elif os.path.isfile(fast5_dir) and fast5_dir.endswith(".fast5"):
        yield fast5_dir

get_readids(fast5_filepath: str) -> List[str]

Get a list of read_ids from a FAST5 file.

Parameters:

Name Type Description Default
fast5_filepath str

Path to a FAST5 file.

required

Returns:

Name Type Description
read_ids List[str]

List of read_ids.

Source code in src/fast5_rekindler/index.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def get_readids(fast5_filepath: str) -> List[str]:
    """
    Get a list of read_ids from a FAST5 file.

    Params:
        fast5_filepath (str): Path to a FAST5 file.

    Returns:
        read_ids (List[str]): List of read_ids.
    """
    read_ids = []
    with get_fast5_file(fast5_filepath, mode="r") as f5:
        for read in f5.get_reads():
            read_ids.append(read.read_id)
    return read_ids

prepare_data(read_ids: List[str], fast5_filepath: str) -> List[Tuple[str, str]]

Prepare tuples for read_id and fast5_filepath for insertion into database.

Parameters:

Name Type Description Default
read_ids List[str]

List of read_ids.

required
fast5_filepath str

Path to a FAST5 file.

required

Returns:

Name Type Description
data List[Tuple[str, str]]

List of tuples of read_id, fast5_filepath

List[Tuple[str, str]]

and other relevant read data.

Source code in src/fast5_rekindler/index.py
78
79
80
81
82
83
84
85
86
87
88
89
90
def prepare_data(read_ids: List[str], fast5_filepath: str) -> List[Tuple[str, str]]:
    """
    Prepare tuples for read_id and fast5_filepath for insertion into database.

    Params:
        read_ids (List[str]): List of read_ids.
        fast5_filepath (str): Path to a FAST5 file.

    Returns:
        data (List[Tuple[str, str]]): List of tuples of read_id, fast5_filepath
        and other relevant read data.
    """
    return [(read_id, fast5_filepath) for read_id in read_ids]

write_database(data: List[Tuple[str, str]], cursor: sqlite3.Cursor, conn: sqlite3.Connection) -> None

Write the index data (read_id, filepath) to the database.

Parameters:

Name Type Description Default
data List[Tuple[str, str, int, int, float, int, int, int, int]]

List of tuples read_id and filepath

required
cursor Cursor

Cursor object for the database.

required
conn Connection

Connection object for the database.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/index.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def write_database(
    data: List[Tuple[str, str]], cursor: sqlite3.Cursor, conn: sqlite3.Connection
) -> None:
    """
    Write the index data (read_id, filepath) to the database.

    Params:
        data (List[Tuple[str, str, int, int, float, int, int, int, int]]): List of tuples read_id and filepath
        cursor (sqlite3.Cursor): Cursor object for the database.
        conn (sqlite3.Connection): Connection object for the database.

    Returns:
        None
    """
    cursor.executemany(
        "INSERT OR IGNORE INTO index_db (read_id, fast5_filepath) VALUES (?, ?)", data
    )

join_dbs

join_databases(output_dir: str) -> str

Merges the index_db and bam_db databases into a single output database fast5_bam_db.db.

Parameters:

Name Type Description Default
output_dir str

Path to the output directory.

required

Returns:

Name Type Description
output_db_path str

Path to the output database.

Source code in src/fast5_rekindler/join_dbs.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def join_databases(output_dir: str) -> str:
    """Merges the index_db and bam_db databases into a
    single output database fast5_bam_db.db.

    Params:
        output_dir (str): Path to the output directory.

    Returns:
        output_db_path (str): Path to the output database.
    """
    bam_db_path = os.path.join(output_dir, "bam_db.db")
    index_db_path = os.path.join(output_dir, "index_db.db")
    output_db_path = os.path.join(output_dir, "fast5_bam_db.db")

    if os.path.exists(output_db_path):
        os.remove(output_db_path)
    # Create output database connection
    conn_output = sqlite3.connect(output_db_path)
    cursor_output = conn_output.cursor()

    # Create the table in the output database
    cursor_output.execute(
        """CREATE TABLE IF NOT EXISTS fast5_bam_db (
                            read_id TEXT PRIMARY KEY,
                            parent_read_id TEXT,
                            fast5_filepath TEXT,
                            block_stride INTEGER,
                            called_events INTEGER,
                            mean_qscore FLOAT,
                            sequence_length INTEGER,
                            duration_template INTEGER,
                            first_sample_template INTEGER,
                            split_point INTEGER,
                            num_events_template INTEGER,
                            moves_table BLOB,
                            read_fasta TEXT,
                            read_quality TEXT,
                            time_stamp TIMESTAMP,
                            action TEXT
                        )"""
    )

    # Attach both databases to the new database connection
    cursor_output.execute(f"ATTACH DATABASE '{index_db_path}' AS idx")
    cursor_output.execute(f"ATTACH DATABASE '{bam_db_path}' AS bam")

    # Perform the JOIN operation and insert the data into the new database
    # - First join selects primary reads
    # - Second join selects duplex reads (those reads that have a common parent read
    # - Third join selects reads that are in FAST5 files but not in the BAM file)
    cursor_output.execute(
        """
        INSERT INTO fast5_bam_db
        SELECT * FROM (
            SELECT b.read_id,
                b.parent_read_id,
                i.fast5_filepath,
                b.block_stride,
                b.called_events,
                b.mean_qscore,
                b.sequence_length,
                b.duration_template,
                b.first_sample_template,
                b.split_point,
                b.num_events_template,
                b.moves_table,
                b.read_fasta,
                b.read_quality,
                b.time_stamp,
                'process' AS action
            FROM bam.bam_db AS b
            INNER JOIN idx.index_db AS i ON b.read_id = i.read_id

            UNION ALL

            SELECT b.read_id,
                b.parent_read_id,
                i.fast5_filepath,
                b.block_stride,
                b.called_events,
                b.mean_qscore,
                b.sequence_length,
                b.duration_template,
                b.first_sample_template,
                b.split_point,
                b.num_events_template,
                b.moves_table,
                b.read_fasta,
                b.read_quality,
                b.time_stamp,
                'duplicate' AS action
            FROM bam.bam_db AS b
            INNER JOIN idx.index_db AS i ON b.parent_read_id = i.read_id

            UNION ALL

            SELECT i.read_id,
                NULL AS parent_read_id,
                i.fast5_filepath,
                NULL AS block_stride,
                NULL AS called_events,
                NULL AS mean_qscore,
                NULL AS sequence_length,
                NULL AS duration_template,
                NULL AS first_sample_template,
                NULL AS split_point,
                NULL AS num_events_template,
                NULL AS moves_table,
                NULL AS read_fasta,
                NULL AS read_quality,
                NULL AS time_stamp,
                'delete' AS action
            FROM idx.index_db AS i
            LEFT JOIN bam.bam_db AS b ON i.read_id = b.read_id
            WHERE b.read_id IS NULL


        )
        """
    )

    # Commit the changes
    conn_output.commit()

    # Check if the data has been inserted
    cursor_output.execute("SELECT COUNT(*) FROM fast5_bam_db")
    row_count = cursor_output.fetchone()[0]
    if row_count > 0:
        # If data is inserted successfully, delete the original databases
        os.remove(index_db_path)
        os.remove(bam_db_path)
    else:
        print("No data was inserted. Original databases have not been removed.")

    # Close the connection
    cursor_output.close()
    conn_output.close()

    return output_db_path

logger_config

configure_logger(new_log_directory: str) -> str

Configure the logger to log to a file.

Parameters:

Name Type Description Default
new_log_directory str

Path to the directory where the log file will be saved.

required

Returns:

Name Type Description
log_filepath str

Path to the log file.

Source code in src/fast5_rekindler/logger_config.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def configure_logger(new_log_directory: str) -> str:
    """Configure the logger to log to a file.

    Params:
        new_log_directory (str): Path to the directory where the log file will be saved.

    Returns:
        log_filepath (str): Path to the log file.
    """
    global log_directory  # Declare log_directory as global to modify it

    if new_log_directory is not None:
        log_directory = (
            new_log_directory  # Update log directory if a new path is provided
        )

    # Ensure log directory exists
    os.makedirs(log_directory, exist_ok=True)
    if not os.path.exists(log_directory):
        os.makedirs(log_directory)

    # Get current date and time
    now = datetime.now()
    timestamp = now.strftime("%Y-%m-%d_%H-%M-%S")

    # Use the timestamp in the log file name
    app_version = get_version()
    log_filename = f"fast5_rekindler_v{app_version}_{timestamp}.log"

    log_filepath = os.path.join(log_directory, log_filename)

    # Configure logger to log to the file
    # No need to call logger.remove() as we want to keep the default stderr handler
    logger.add(log_filepath, format="{time} {level} {message}")

    # Now logs will be sent to both the terminal and log_filename
    logger.opt(depth=1).info(f"Started FAST5 Rekindler v({app_version})\n")
    return log_filepath

get_version() -> Any

Get the version of the app from pyproject.toml.

Returns:

Name Type Description
app_version Any

Version of the app.

Source code in src/fast5_rekindler/logger_config.py
14
15
16
17
18
19
20
21
def get_version() -> Any:
    """Get the version of the app from pyproject.toml.

    Returns:
        app_version (Any): Version of the app.
    """
    app_version = version("fast5_rekindler")
    return app_version

raw_fast5

convert_pod5_to_raw_fast5(input_dir: str, output_dir: str, num_processes: int = 4) -> None

Converts all pod5 files in input_dir to raw fast5 files and saves them in output_dir.

Parameters:

Name Type Description Default
input_dir str

Path to the directory containing the pod5 files.

required
output_dir str

Path to the output directory.

required
num_processes int

Number of processes to use. Default is 4.

4

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/raw_fast5.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def convert_pod5_to_raw_fast5(
    input_dir: str, output_dir: str, num_processes: int = 4
) -> None:
    """
    Converts all pod5 files in input_dir to raw fast5 files and saves them in output_dir.

    Params:
        input_dir (str): Path to the directory containing the pod5 files.
        output_dir (str): Path to the output directory.
        num_processes (int): Number of processes to use. Default is 4.

    Returns:
        None
    """
    p5_f5_filepaths, num_pod5_files = make_pod5_file_list(input_dir, output_dir)

    with WorkerPool(n_jobs=min(num_processes, num_pod5_files)) as pool:
        pool.map(
            convert_pod5_to_raw_fast5_worker,
            [
                (pod5_filepath, fast5_filepath)
                for pod5_filepath, fast5_filepath in p5_f5_filepaths
            ],
            iterable_len=num_pod5_files,
            progress_bar=True,
            progress_bar_options={"desc": "", "unit": "files", "colour": "green"},
        )

convert_pod5_to_raw_fast5_worker(pod5_filepath: str, fast5_filepath: str) -> None

Converts a single pod5 file to a raw fast5 file.

Parameters:

Name Type Description Default
pod5_filepath str

Path to the pod5 file.

required
fast5_filepath str

Path to the output fast5 file.

required

Returns:

Type Description
None

None

Source code in src/fast5_rekindler/raw_fast5.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def convert_pod5_to_raw_fast5_worker(pod5_filepath: str, fast5_filepath: str) -> None:
    """Converts a single pod5 file to a raw fast5 file.

    Params:
        pod5_filepath (str): Path to the pod5 file.
        fast5_filepath (str): Path to the output fast5 file.

    Returns:
        None
    """
    # Fetch all the read ids in the POD5 file
    with p5.Reader(pod5_filepath) as reader:
        read_ids = reader.read_ids
    # Convert
    convert_pod5_to_fast5(Path(pod5_filepath), Path(fast5_filepath), read_ids)

get_pod5_filepath(read_id: str, db_cursor: sqlite3.Cursor) -> Any

Fetches the pod5 filepath for a given read_id from the SQLite index database.

Parameters:

Name Type Description Default
read_id str

str The read_id of the read to be extracted.

required
db_cursor Cursor

sqlite3.Cursor Cursor object for the database.

required

Returns: pod5_filepath: str Path to the pod5 file.

Source code in src/fast5_rekindler/raw_fast5.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def get_pod5_filepath(read_id: str, db_cursor: sqlite3.Cursor) -> Any:
    """Fetches the pod5 filepath for a given read_id
    from the SQLite index database.

    Params:
        read_id: str
            The read_id of the read to be extracted.
        db_cursor: sqlite3.Cursor
            Cursor object for the database.
    Returns:
        pod5_filepath: str
            Path to the pod5 file.
    """
    db_cursor.execute("SELECT pod5_filepath FROM data WHERE read_id = ?", (read_id,))
    result = db_cursor.fetchone()
    return result[0] if result else None

make_pod5_file_list(input_dir: str, output_dir: str) -> Tuple[Iterator[Tuple[str, str]], int]

Recursively find all pod5 files (*.pod5) in input_dir and return a list of Path objects.

Parameters:

Name Type Description Default
input_dir str

Path to the directory containing the pod5 files.

required

Returns:

Name Type Description
pod5_file_list Tuple[Iterator[Tuple[str, str]], int]

A tuple of zipped pod5 and fast5 filepaths and total number of pod5 files.

Source code in src/fast5_rekindler/raw_fast5.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def make_pod5_file_list(
    input_dir: str, output_dir: str
) -> Tuple[Iterator[Tuple[str, str]], int]:
    """Recursively find all pod5 files (*.pod5) in input_dir and return a list of Path objects.

    Params:
        input_dir (str): Path to the directory containing the pod5 files.

    Returns:
        pod5_file_list (Tuple[Iterator[Tuple[str, str]], int]): A tuple of zipped pod5 and fast5 filepaths and total number of pod5 files.
    """

    pod5_file_list = []
    fast5_file_list = []
    for root, _, files in os.walk(input_dir):
        for file in files:
            if file.endswith(".pod5"):
                # Construct the full path of the .fast5 file
                pod5_output_path = os.path.join(root, file)

                # Construct the corresponding .pod5 file path in the output directory
                relative_path = os.path.relpath(root, input_dir)
                fast5_output_path = os.path.join(
                    output_dir, relative_path, file.replace(".pod5", ".fast5")
                )

                # Create the subdirectory in the output directory if it doesn't exist
                os.makedirs(os.path.dirname(fast5_output_path), exist_ok=True)

                # Add the paths to the lists
                pod5_file_list.append(pod5_output_path)
                fast5_file_list.append(fast5_output_path)
    return zip(pod5_file_list, fast5_file_list), len(pod5_file_list)