Skip to content
Open
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ suball.abPOA:
rm -fr ${INCLDIR}/simde && cp -r submodules/abPOA/include/simde ${INCLDIR}

suball.lastz:
cd submodules/lastz/src && sed -i Makefile -e 's/-lm -o/-lm $${LIBS} -o/g'
cd submodules/lastz/src && sed -i -e 's/-lm -o/-lm $${LIBS} -o/g' Makefile
cd submodules/lastz && LIBS="${jemallocLib}" ${MAKE}
ln -f submodules/lastz/src/lastz bin

Expand All @@ -321,7 +321,7 @@ suball.FASTGA:
ln -f submodules/FASTGA/GIXmake ${BINDIR}
ln -f submodules/FASTGA/GIXrm ${BINDIR}
suball.FASTAN:
cd submodules/FASTAN && sed -i Makefile -e 's/-lm -lz/-lm -lpthread -lz/g' && ${MAKE} || true
cd submodules/FASTAN && sed -i -e 's/-lm -lz/-lm -lpthread -lz/g' Makefile && ${MAKE} || true
ln -f submodules/FASTAN/FasTAN ${BINDIR}
suball.alntools:
cd submodules/alntools && ${MAKE}
Expand Down
45 changes: 24 additions & 21 deletions caf/impl/pinchIterator.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ typedef struct _pairwiseAlignmentToPinch {
Paf *(*getPairwiseAlignment)(void *);
Paf *paf;
int64_t xCoordinate, yCoordinate, xName, yName;
Cigar *op;
int64_t opIndex;
bool freeAlignments;
} PairwiseAlignmentToPinch;

Expand All @@ -63,43 +63,46 @@ static stPinch *pairwiseAlignmentToPinch_getNext(PairwiseAlignmentToPinch *pA, s
if (pA->paf == NULL) {
return NULL;
}
pA->op = pA->paf->cigar;
pA->opIndex = 0;
pA->xCoordinate = pA->paf->same_strand ? pA->paf->query_start : pA->paf->query_end;
pA->yCoordinate = pA->paf->target_start;
pA->xName = cactusMisc_stringToName(pA->paf->query_name);
pA->yName = cactusMisc_stringToName(pA->paf->target_name);
}
while (pA->op != NULL) {
assert(pA->op->length >= 1);
if (pA->op->op == match || pA->op->op == sequence_match || pA->op->op == sequence_mismatch) { //deal with the possibility of a zero length match (strange, but not illegal)
while (pA->opIndex < cigar_count(pA->paf->cigar)) {
CigarRecord *op = cigar_get(pA->paf->cigar, pA->opIndex);
assert(op->length >= 1);
if (op->op == match || op->op == sequence_match || op->op == sequence_mismatch) { //deal with the possibility of a zero length match (strange, but not illegal)
// Make maximal length (in case run of sequence matches and mismatches)
int64_t i=0; // Represents the length of the previous matches in the sequence
while(pA->op->next != NULL && (pA->op->next->op == match ||
pA->op->next->op == sequence_match ||
pA->op->next->op == sequence_mismatch)) {
i+=pA->op->length;
pA->op = pA->op->next;
while(pA->opIndex + 1 < cigar_count(pA->paf->cigar) &&
(cigar_get(pA->paf->cigar, pA->opIndex + 1)->op == match ||
cigar_get(pA->paf->cigar, pA->opIndex + 1)->op == sequence_match ||
cigar_get(pA->paf->cigar, pA->opIndex + 1)->op == sequence_mismatch)) {
i += op->length;
pA->opIndex++;
op = cigar_get(pA->paf->cigar, pA->opIndex);
}
if (pA->paf->same_strand) {
stPinch_fillOut(pinchToFillOut, pA->xName, pA->yName, pA->xCoordinate, pA->yCoordinate, i+pA->op->length,
stPinch_fillOut(pinchToFillOut, pA->xName, pA->yName, pA->xCoordinate, pA->yCoordinate, i+op->length,
1);
pA->xCoordinate += i+pA->op->length;
pA->xCoordinate += i+op->length;
} else {
pA->xCoordinate -= i+pA->op->length;
stPinch_fillOut(pinchToFillOut, pA->xName, pA->yName, pA->xCoordinate, pA->yCoordinate, i+pA->op->length,
pA->xCoordinate -= i+op->length;
stPinch_fillOut(pinchToFillOut, pA->xName, pA->yName, pA->xCoordinate, pA->yCoordinate, i+op->length,
0);
}
pA->yCoordinate += i+pA->op->length;
pA->op = pA->op->next;
pA->yCoordinate += i+op->length;
pA->opIndex++;
return pinchToFillOut;
}
if (pA->op->op != query_delete) {
pA->xCoordinate += pA->paf->same_strand ? pA->op->length : -pA->op->length;
if (op->op != query_delete) {
pA->xCoordinate += pA->paf->same_strand ? op->length : -op->length;
}
if (pA->op->op != query_insert) {
pA->yCoordinate += pA->op->length;
if (op->op != query_insert) {
pA->yCoordinate += op->length;
}
pA->op = pA->op->next;
pA->opIndex++;
}
if (pA->paf->same_strand) {
assert(pA->xCoordinate == pA->paf->query_end);
Expand Down
36 changes: 22 additions & 14 deletions caf/tests/pinchIteratorTest.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ static void testIterator(CuTest *testCase, stPinchIterator *pinchIterator, stLis
sscanf(paf->target_name, "%" PRIi64 "", &contigY);
int64_t x = paf->same_strand ? paf->query_start : paf->query_end;
int64_t y = paf->target_start;
Cigar *c = paf->cigar;
while(c != NULL) {
for (int64_t ci = 0; ci < cigar_count(paf->cigar); ci++) {
CigarRecord *c = cigar_get(paf->cigar, ci);
if (c->op == match) {
if (c->length > 2 * trim) {
stPinch *pinch = stPinchIterator_getNext(pinchIterator, &pinchToFillOut);
Expand All @@ -44,7 +44,6 @@ static void testIterator(CuTest *testCase, stPinchIterator *pinchIterator, stLis
if (c->op != query_insert) {
y += c->length;
}
c = c->next;
}
}
CuAssertPtrEquals(testCase, NULL, stPinchIterator_getNext(pinchIterator, &pinchToFillOut));
Expand All @@ -65,23 +64,32 @@ static stList *getRandomPairwiseAlignments() {
paf->target_start = st_randomInt(100000, 1000000);
paf->same_strand = st_random() > 0.5;
int64_t i = paf->query_start, j = paf->target_start;
Cigar **pc = &(paf->cigar);
CigarOp p_op_type = query_delete; // This is a fudge to ensure that we don't end up with two matches in succession
// in the cigar - because the pinch iterator will smush them together
int64_t capacity = 16, num_recs = 0;
CigarRecord *recs = st_malloc(capacity * sizeof(CigarRecord));
do {
Cigar *c = st_calloc(1, sizeof(Cigar));
c->length = st_randomInt(1, 10);
c->op = st_random() > 0.3 ? ((st_random() > 0.5 && p_op_type != match) ? match : query_insert): query_delete;
p_op_type = c->op;
if (c->op != query_delete) {
i += c->length;
if (num_recs == capacity) {
capacity *= 2;
recs = realloc(recs, capacity * sizeof(CigarRecord));
}
if (c->op != query_insert) {
j += c->length;
recs[num_recs].length = st_randomInt(1, 10);
recs[num_recs].op = st_random() > 0.3 ? ((st_random() > 0.5 && p_op_type != match) ? match : query_insert): query_delete;
p_op_type = recs[num_recs].op;
if (recs[num_recs].op != query_delete) {
i += recs[num_recs].length;
}
*pc = c;
pc = &(c->next);
if (recs[num_recs].op != query_insert) {
j += recs[num_recs].length;
}
num_recs++;
} while(st_random() > 0.1 || paf->query_start == i || paf->target_start == j);
Cigar *cigar = st_calloc(1, sizeof(Cigar));
cigar->recs = recs;
cigar->length = num_recs;
cigar->start = 0;
cigar->capacity = capacity;
paf->cigar = cigar;
paf->query_end = i;
paf->target_end = j;
paf->query_length = i;
Expand Down
7 changes: 7 additions & 0 deletions src/cactus/cactus_progressive_config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
<!-- pickIngroupPrimaryAlignmentsSeparatelyToOutgroups Separately make ingroups pick their primary alignment to
other ingroups without outgroups, then get the outgroups to pick their primary alignment to the ingroups. If 0
get every sequence to pick its primary alignment without regard to if the other sequence is an ingroup or outgroup -->
<!-- chainSplitQueryLength When running paf chaining pipeline divides pafs by query contig and runs the chaining for each contig
in parallel. To cope with small contigs, contigs smaller than chainSplitQueryLength length are aggregated into collections
of contigs not greater than this length. -->
<!-- chainSplitMinFileSize PAF files smaller than this size (in bytes) skip the query contig splitting and chain directly.
Set to 0 to always split when chainSplitQueryLength is enabled. -->
<!-- slurmChunkScale Multiply chunkSize and divide dechunkBatchSize by this value when running slurm in order to decrease job count -->
<blast chunkSize="30000000"
bigChunkSize="6000000000"
Expand Down Expand Up @@ -110,6 +115,8 @@
outputSecondaryAlignments="0"
dechunkBatchSize="1000"
pickIngroupPrimaryAlignmentsSeparatelyToOutgroups="1"
chainSplitQueryLength="100000000"
chainSplitMinFileSize="1000000000"
slurmChunkScale="3"
bigGenomeChunkScale="2"
>
Expand Down
93 changes: 83 additions & 10 deletions src/cactus/paf/local_alignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,39 +609,112 @@ def chain_alignments(job, alignment_files, alignment_names, reference_event_name


def chain_one_alignment(job, alignment_file, alignment_name, params, include_inverted_alignments):
""" run paffy chain on one PAF. include_inverted_alignemnts is a flag to control if we additionally include
the inverted paf records for chaining.
""" run paffy chain on one PAF. include_inverted_alignments is a flag to control if we additionally include
the inverted paf records for chaining. If chainSplitQueryLength is set in the config, splits the PAF by query
contig and chains each chunk in parallel before concatenating the results.
"""

work_dir = job.fileStore.getLocalTempDir()
alignment_path = os.path.join(work_dir, alignment_name + '.paf')
alignment_inv_path = os.path.join(work_dir, alignment_name + '.inv.paf')
output_path = os.path.join(work_dir, alignment_name + '.chained.paf')

# Copy the alignments from the job store
job.fileStore.readGlobalFile(alignment_file, alignment_path)

# Get the forward and reverse versions of each alignment for symmetry with chaining if include_inverted_alignments
# is set
# Append inverted alignments BEFORE splitting (inversion swaps query/target)
if include_inverted_alignments:
alignment_inv_path = os.path.join(work_dir, alignment_name + '.inv.paf')
shutil.copyfile(alignment_path, alignment_inv_path)
cactus_call(parameters=['paffy', 'invert', "-i", alignment_inv_path], outfile=alignment_path, outappend=True,
job_memory=job.memory)

# Now chain the alignments
cactus_call(parameters=['paffy', 'chain', "-i", alignment_path,
chain_params = ['paffy', 'chain',
"--maxGapLength", params.find("blast").attrib["chainMaxGapLength"],
"--chainGapOpen", params.find("blast").attrib["chainGapOpen"],
"--chainGapExtend", params.find("blast").attrib["chainGapExtend"],
"--trimFraction", params.find("blast").attrib["chainTrimFraction"],
"--logLevel", getLogLevelString()]

split_query_length = getOptionalAttrib(params.find("blast"), "chainSplitQueryLength",
typeFn=int, default=0)
chain_split_min_file_size = getOptionalAttrib(params.find("blast"), "chainSplitMinFileSize",
typeFn=int, default=0)
paf_file_size = os.path.getsize(alignment_path)

if paf_file_size >= chain_split_min_file_size:
# Split the PAF by query contig
split_dir = os.path.join(work_dir, 'split')
os.makedirs(split_dir)
split_prefix = os.path.join(split_dir, 'split_')
cactus_call(parameters=['paffy', 'split_file', '-i', alignment_path, '-q',
'-m', str(split_query_length), '-p', split_prefix],
job_memory=job.memory)
split_files = [os.path.join(split_dir, f) for f in sorted(os.listdir(split_dir))
if f.endswith('.paf')]
else:
split_files = []

if len(split_files) <= 1:
# No splitting or only one chunk — chain directly (no fan-out overhead)
input_path = split_files[0] if split_files else alignment_path
cactus_call(parameters=chain_params + ["-i", input_path],
outfile=output_path, job_memory=job.memory)
job.fileStore.deleteGlobalFile(alignment_file)
return job.fileStore.writeGlobalFile(output_path)

# Multiple chunks — fan out parallel chain jobs
root_job = Job()
job.addChild(root_job)

chained_chunk_ids = []
for split_file in split_files:
chunk_size = os.path.getsize(split_file)
chunk_file_id = job.fileStore.writeGlobalFile(split_file)
chained_chunk_ids.append(
root_job.addChildJobFn(chain_one_split_chunk, chunk_file_id, params,
disk=4 * chunk_size,
memory=cactus_clamp_memory(2 * chunk_size)).rv())

job.fileStore.deleteGlobalFile(alignment_file)
return root_job.addFollowOnJobFn(concatenate_chained_chunks, chained_chunk_ids,
disk=2 * alignment_file.size).rv()


def chain_one_split_chunk(job, chunk_file_id, params):
"""Run paffy chain on a single split chunk of a PAF file."""
work_dir = job.fileStore.getLocalTempDir()
chunk_path = os.path.join(work_dir, 'chunk.paf')
output_path = os.path.join(work_dir, 'chained_chunk.paf')

job.fileStore.readGlobalFile(chunk_file_id, chunk_path)

cactus_call(parameters=['paffy', 'chain', "-i", chunk_path,
"--maxGapLength", params.find("blast").attrib["chainMaxGapLength"],
"--chainGapOpen", params.find("blast").attrib["chainGapOpen"],
"--chainGapExtend", params.find("blast").attrib["chainGapExtend"],
"--trimFraction", params.find("blast").attrib["chainTrimFraction"],
"--logLevel", getLogLevelString()],
outfile=output_path, job_memory=job.memory)

job.fileStore.deleteGlobalFile(alignment_file)
job.fileStore.deleteGlobalFile(chunk_file_id)
return job.fileStore.writeGlobalFile(output_path)


def concatenate_chained_chunks(job, chained_chunk_ids):
"""Concatenate chained PAF chunks back into a single file."""
work_dir = job.fileStore.getLocalTempDir()
output_path = os.path.join(work_dir, 'chained_combined.paf')

with open(output_path, 'w') as out_file:
for chunk_id in chained_chunk_ids:
chunk_path = job.fileStore.readGlobalFile(chunk_id)
with open(chunk_path, 'r') as chunk_file:
shutil.copyfileobj(chunk_file, out_file)
job.fileStore.deleteGlobalFile(chunk_id)

return job.fileStore.writeGlobalFile(output_path)


def tile_alignments(job, alignment_files, reference_event_name, params, has_resources=False, total_sequence_size=0):
# do everything post-chaining
# Memory: paffy tile loads all PAFs + creates SequenceCountArray (2 bytes per base of query sequence)
Expand Down
2 changes: 1 addition & 1 deletion submodules/sonLib