写在前面

上一篇介绍了 Snakemake 入门教程 做了一个简单的示例,具体查看我的上一篇内容

下面会介绍一下 Snakemake的常用参数以及进阶的用法

介绍之前大家可以看一个视频了解一下(时长:19min14s, 选择性观看)

参数介绍

命令行参数

  1. 内核数调用
1
2
3
4
$ snakemake --cores 1

# 指定多个可用内核
$ snakemake --cores 4

Snakemake 执行在同一目录中 名为 Snakefile 的文件中指定的工作流(Snakefile 可以通过参数 -s 给出)。

  1. 试运行
1
2
3
4
$ snakemake -n

# 试运行并打印试行内容
$ snakemake -n -r

可以进行试运行,测试工作流是否正确定义以及估计所需计算量。

Snakemake 工作流通常定义某些规则的使用线程数。 有时,覆盖工作流定义中给出的默认值可以通过使用--set-threads参数来完成,例如,

1
$ snakemake --cores 4 --set-threads myrule=2

将覆盖为rule myrule 定义的任何线程数,并使用2代替。 类似地,可以覆盖规则中的其他资源定义,通过

1
$ snakemake --cores 4 --set-resources myrule:partition="foo"

当与集群执行结合使用时,这两种机制都特别方便。

处理非常大的工作流程

Snakemake 允许批量运行大型工作流。这样,一次需要评估的文件更少,因此可以更快地推断出作业 DAG

1
2
3
4
5
6
7
8
# 指示仅计算规则myrule 的三批输入中的第一批
$ snakemake --cores 4 --batch myrule=1/3

# 生成第二批
$ snakemake --cores 4 --batch myrule=2/3

# 生成第三批
$ snakemake --cores 4 --batch myrule=3/3
一些参数介绍 具体内容
target 要建立的目标,可能是rule或文件
–dry-run, --dryrun, -n 不执行任何操作,并显示将要执行的操作。如果是一个非常大的工作流程,使用 –dry-run –quiet 仅打印作业 DAG 的摘要
–profile Snakemake 的配置文件的名称
–snakefile, -s 指定Snakefile,否则是当前目录下的Snakefile
–cores, -c , -j 并行使用 N 个 CPU cores/jobs
–resources, --res 指定程序运行的内存
–config 设置或覆盖工作流配置对象中的值
–directory, -d 指定工作目录
–touch, -t 将某文件标记为最新,不真正更改它们
–force 强制执行某一条
–forceall 强制执行某条Rule及它的依赖
–report 创建包含结果和统计信息的 HTML 报告
–list, -l 显示给定 Snakefile 中的可用rule
–dag 生成整个流程的有向无环图,不进行分析
–quiet, -q 不输出任何进度或规则信息
–all-temp 将所有输出文件标记为临时文件
–bash-completion 文件名、规则名和参数的 bash 补全

进阶用法

1. 指定使用的线程数

对于某些工具,建议使用多个线程以加快计算速度。Snakemake可以通过threads指令了解规则所需的“threads”。在示例工作流中,为规则bwa_map使用多个线程:

1
$ snakemake --cores 4 --set-threads myrule=2
1
2
3
4
5
6
7
8
9
rule bwa_map:
input:
"data/genome.fa",
"data/samples/{sample}.fastq"
output:
"mapped_reads/{sample}.bam"
threads: 8
shell:
"bwa mem -t {threads} {input} | samtools view -Sb - > {output}"

可以使用大括号表示法(即{threads})将线程数传到 shell 命令。如果没有threads指令,则默认使用 1 个线程。

执行工作流时,Snakemake 调度程序会考虑作业所需的线程数。 调度程序会确保同时运行的所有作业的线程总和不超过给定的可用 CPU 内核数。

这个数字是通过 --cores 命令行参数给出的,这对于实际运行工作流的 snakemake 调用是必需的。 例如

1
$ snakemake --cores 10

将使用 10 个内核执行工作流。 由于rule bwa_map 需要 8 个线程,因此一次只能运行该rule的一项作业,

Snakemake 调度程序将尝试用其他作业(例如 samtools_sort )来使剩余的内核饱和。

当提供的内核数少于线程数时,规则使用的线程数将减少到给定的内核数。

如果--cores 没有给出数字,则使用所有可用的核心。

使用标志--forceall,可以强制完全重新执行工作流。 将此标志与 --cores 的不同值结合起来,并检查调度程序如何选择要并行运行的作业。

除了非常常见的线程资源外,Snakemake 还提供了一个resources指令,可用于指定任意资源,例如内存使用或辅助计算设备(如 GPU)。与线程类似,当使用命令行参数提供该资源的可用量时,调度程序可以考虑 --resources( 参考资料

2. 配置文件

Snakemake 提供了一个配置文件机制。配置文件可以用JSONYAML编写,并与configfile指令一起使用。在我们的示例工作流程中,我们添加了以下行

1
configfile: "config.yaml"

写到 Snakefile 的顶部

Snakemake 将加载配置文件并将其内容存储到名为 config 的全局可用字典中。 在我们的例子中,将 config.yaml 中的示例指定为

1
2
3
samples:
A: data/samples/A.fastq
B: data/samples/B.fastq

现在,我们可以从 Snakefile 中删除定义 SAMPLES 的语句,并将规则 bcftools_call 更改为

1
2
3
4
5
6
7
8
9
10
rule bcftools_call:
input:
fa="data/genome.fa",
bam=expand("sorted_reads/{sample}.bam", sample=config["samples"]),
bai=expand("sorted_reads/{sample}.bam.bai", sample=config["samples"])
output:
"calls/all.vcf"
shell:
"samtools mpileup -g -f {input.fa} {input.bam} | "
"bcftools call -mv - > {output}"

3. 输入函数

由于我们已经在配置文件中存储了 FASTQ 文件的路径,我们也可以在 rule bwa_map 来使用这些路径。这种情况与我们上面修改的规则“bcftools_call”不同。

要理解这一点,重要的是要知道 Snakemake 工作流分三个阶段执行。

  1. 初始化阶段,定义工作流的文件被解析并实例化所有规则。

  2. DAG 阶段,通过填充通配符并将输入文件与输出文件匹配来构建所有作业的有向无环依赖图。

  3. 调度阶段,执行作业的 DAG,根据可用资源启动作业。

rule bcftools_call 的输入文件列表中的扩展函数在初始化阶段执行。

在这个阶段,我们不知道作业、通配符值和规则依赖关系。因此,在此阶段,我们无法从配置文件中确定rule bwa_map 的 FASTQ 路径,因为我们甚至不知道将从该rule生成哪些作业。相反,我们需要将输入文件的确定推迟到 DAG 阶段。这可以通过指定输入函数而不是输入指令内部的字符串来实现。对于规则bwa_map,其工作方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
def get_bwa_map_input_fastqs(wildcards):
return config["samples"][wildcards.sample]

rule bwa_map:
input:
"data/genome.fa",
get_bwa_map_input_fastqs
output:
"mapped_reads/{sample}.bam"
threads: 8
shell:
"bwa mem -t {threads} {input} | samtools view -Sb - > {output}"

任何正常的功能都可以正常工作。 输入函数将 wildcards 对象作为单个参数,该对象允许通过属性(此处为 wildcards.sample )访问通配符值。 它们必须返回一个字符串或一个字符串列表,这些字符串被解释为输入文件的路径(这里,我们返回在配置文件中为 示例存储的路径 )。 一旦确定了作业的通配符值,就会评估输入函数。

当添加新的输入文件时,Snakemake 不会自动重新运行作业,如下所示。 但是,可以使用 snakemake --list-input-changes 获取受此类更改影响的输出文件列表。 要触发重新运行,这一点 bash 有帮助:

示例

data/samples 文件夹中,还有一个文件 C.fastq

将该文件添加到配置文件中,并查看当使用 snakemake -n --reason --forcerun bcftools_call 调用时,Snakemake 如何重新计算属于新示例的工作流部分。

4. rule 参数

有时,shell 命令不仅由inputoutput 文件以及一些静态标志组成。

特别是,可能需要根据作业的通配符值设置其他参数。 为此,Snakemake 允许使用 params 指令为 rule 定义任意参数。

有时,shell 命令不仅由输入和输出文件以及一些静态标志组成。 特别是,可能需要根据作业的通配符值设置其他参数。 为此,Snakemake 允许使用 params 指令为规则定义任意参数。

在我们的工作流程中,使用所谓的read groups注释对齐的reads是合理的,其中包含像sample名称这样的元数据。 我们相应地修改规则bwa_map

1
2
3
4
5
6
7
8
9
10
11
rule bwa_map:
input:
"data/genome.fa",
lambda wildcards: config["samples"][wildcards.sample]
output:
"mapped_reads/{sample}.bam"
params:
rg=r"@RG\tID:{sample}\tSM:{sample}"
threads: 8
shell:
"bwa mem -R '{params.rg}' -t {threads} {input} | samtools view -Sb - > {output}"

inputoutput 类似,可以从 shell 命令、基于 Python 的 run 块或脚本指令访问 params

params 指令也可以使用第 3 步中的函数将初始化推迟到 DAG 阶段。 与 input 函数相比,这些函数可以选择采用额外的参数 inputoutputthreadsresources

5. log文件记录

在执行大型工作流时,通常希望将每个作业的日志输出存储到单独的文件中,而不是仅仅将所有日志输出打印到终端(当多个作业并行运行时,这会导致输出混乱)

为此,Snakemake 允许为rule指定日志文件。 日志文件是通过 log 指令定义的,处理方式与输出文件类似,但它们不受rule匹配的影响,并且在作业失败时不会被清除。 修改规则 bwa_map 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rule bwa_map:
input:
"data/genome.fa",
get_bwa_map_input_fastqs
output:
"mapped_reads/{sample}.bam"
params:
rg=r"@RG\tID:{sample}\tSM:{sample}"
log:
"logs/bwa_mem/{sample}.log"
threads: 8
shell:
"(bwa mem -R '{params.rg}' -t {threads} {input} | "
"samtools view -Sb - > {output}) 2> {log}"

以上内容通过修改 shell 命令以收集 bwasamtools 的 STDERR 输出,并将其通过管道传输到 {log} 引用的文件中。 日志文件必须包含与输出文件完全相同的通配符,以避免相同规则的不同作业之间的文件名冲突

最佳做法是将所有日志文件存储在子目录logs/中,并以 rule 或工具名称为前缀。

6. 临时文件和受保护文件

在工作流程中,为每个样本创建两个 BAM 文件,即rule bwa_mapsamtools_sort 的输出。 在不处理示例时,底层数据通常是巨大的。 因此,生成的 BAM 文件需要大量磁盘空间,并且它们的创建需要一些时间。

为了节省磁盘空间,您可以将输出文件标记为临时文件。 一旦所有消耗作业(需要它作为输入)都已执行,Snakemake 将为您删除标记的文件。 我们将这种机制用于规则 bwa_map 的输出文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rule bwa_map:
input:
"data/genome.fa",
get_bwa_map_input_fastqs
output:
temp("mapped_reads/{sample}.bam")
params:
rg=r"@RG\tID:{sample}\tSM:{sample}"
log:
"logs/bwa_mem/{sample}.log"
threads: 8
shell:
"(bwa mem -R '{params.rg}' -t {threads} {input} | "
"samtools view -Sb - > {output}) 2> {log}"

一旦执行了后续的 samtools_sort 作业,将导致BAM 文件被删除 。 由于通过读取映射和排序创建 BAM 文件的计算成本很高,因此保护最终的 BAM 文件免遭意外删除或修改是合理的。

修改规则 samtools_sort 以将其输出文件标记为 protected

1
2
3
4
5
6
7
8
9
10
rule samtools_sort:
input:
"mapped_reads/{sample}.bam"
output:
protected("sorted_reads/{sample}.bam")
shell:
"samtools sort -T sorted_reads/{wildcards.sample} "
"-O bam {input} > {output}"

# 成功执行后,Snakemake 会对文件系统中的输出文件进行写保护,以免被意外覆盖或删除。

重新执行整个工作流程并观察 Snakemake 如何处理临时文件和受保护文件。
使用目标运行 Snakemake mapped_reads/A.bam。虽然该文件被标记为临时文件,但您会看到 Snakemake 并没有删除它,因为它被指定为目标文件。
尝试使用试运行选项再次重新执行整个工作流程。您将看到它失败,因为 Snakemake 无法覆盖受保护的输出文件。

总结

对于本教程,我们学习创建了一个config.yaml配置文件:

1
2
3
4
5
samples:
A: data/samples/A.fastq
B: data/samples/B.fastq

prior_mutation_rate: 0.001

有了这个,我们工作流程的最终版本Snakefile看起来像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
configfile: "config.yaml"

rule all:
input:
"plots/quals.svg"

def get_bwa_map_input_fastqs(wildcards):
return config["samples"][wildcards.sample]

rule bwa_map:
input:
"data/genome.fa",
get_bwa_map_input_fastqs
output:
temp("mapped_reads/{sample}.bam")
params:
rg=r"@RG\tID:{sample}\tSM:{sample}"
log:
"logs/bwa_mem/{sample}.log"
threads: 8
shell:
"(bwa mem -R '{params.rg}' -t {threads} {input} | "
"samtools view -Sb - > {output}) 2> {log}"

rule samtools_sort:
input:
"mapped_reads/{sample}.bam"
output:
protected("sorted_reads/{sample}.bam")
shell:
"samtools sort -T sorted_reads/{wildcards.sample} "
"-O bam {input} > {output}"

rule samtools_index:
input:
"sorted_reads/{sample}.bam"
output:
"sorted_reads/{sample}.bam.bai"
shell:
"samtools index {input}"

rule bcftools_call:
input:
fa="data/genome.fa",
bam=expand("sorted_reads/{sample}.bam", sample=config["samples"]),
bai=expand("sorted_reads/{sample}.bam.bai", sample=config["samples"])
output:
"calls/all.vcf"
params:
rate=config["prior_mutation_rate"]
log:
"logs/bcftools_call/all.log"
shell:
"(samtools mpileup -g -f {input.fa} {input.bam} | "
"bcftools call -mv -P {params.rate} - > {output}) 2> {log}"

rule plot_quals:
input:
"calls/all.vcf"
output:
"plots/quals.svg"
script:
"scripts/plot-quals.py"