Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Pin j178/prek-action action to 91fd7d7 ([#3931](https://github.com/nf-core/tools/pull/3931))
- add pre-commit hook to keep uv.lock in sync ([#3933](https://github.com/nf-core/tools/pull/3933))
- Update mcr.microsoft.com/devcontainers/miniconda Docker digest to 2be0f5a ([#3946](https://github.com/nf-core/tools/pull/3946))
- Fix quote handling in meta.yml ([#3948](https://github.com/nf-core/tools/pull/3948))
- Fix docker errors in test ([#3924](https://github.com/nf-core/tools/pull/3924))
- Update actions/checkout digest to 8e8c483 ([#3956](https://github.com/nf-core/tools/pull/3956))
- Update GitHub Actions ([#3957](https://github.com/nf-core/tools/pull/3957))
Expand Down
101 changes: 62 additions & 39 deletions nf_core/components/nfcore_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,22 +205,13 @@ def get_inputs_from_main_nf(self) -> None:
log.debug(f"Could not find any inputs in {self.main_nf}")
return
input_data = data.split("input:")[1].split("output:")[0]
regex_keyword = r"\b(val|path)\b"
for line in input_data.split("\n"):
channel_elements: Any = []
line = line.split("//")[0] # remove any trailing comments
regex = r"\b(val|path)\b\s*(\(([^)]+)\)|\s*([^)\s,]+))"
matches = re.finditer(regex, line)
for _, match in enumerate(matches, start=1):
input_val = None
if match.group(3):
input_val = match.group(3).split(",")[0] # handle `files, stageAs: "inputs/*"` cases
elif match.group(4):
input_val = match.group(4).split(",")[0] # handle `files, stageAs: "inputs/*"` cases
if input_val:
input_val = re.split(r',(?=(?:[^\'"]*[\'"][^\'"]*[\'"])*[^\'"]*$)', input_val)[
0
] # Takes only first part, avoid commas in quotes
input_val = input_val.strip().strip("'").strip('"') # remove quotes and whitespaces
for match in re.finditer(regex_keyword, line):
if input_val := self._extract_value_from_line(line, match.end()):
input_val = self._split_first_param(input_val)
channel_elements.append({input_val: {}})
if len(channel_elements) == 1:
inputs.append(channel_elements[0])
Expand All @@ -244,6 +235,56 @@ def get_inputs_from_main_nf(self) -> None:
log.debug(f"Found {len(inputs)} inputs in {self.main_nf}")
self.inputs = inputs

def _split_first_param(self, value: str) -> str:
"""
Extract first parameter from comma-separated list, respecting quotes.

Args:
value: String that may contain comma-separated parameters

Returns:
First parameter with whitespace stripped
"""
result = re.split(r',(?=(?:[^\'"]*[\'"][^\'"]*[\'"])*[^\'"]*$)', value)[0]
return result.strip()

def _extract_value_from_line(self, line: str, pos: int) -> str | None:
"""
Extract value after keyword, handling parentheses and quotes.

Uses a simple state machine to find matching closing parenthesis
while respecting quoted strings.

Args:
line: The line to parse
pos: Position in line where keyword ends

Returns:
Extracted value or None if not found
"""
rest = line[pos:].lstrip()
if not rest:
return None

if not rest.startswith("("):
# No parentheses, extract until comma or newline
match = re.match(r"([^,\n]*)", rest)
return match.group(1).strip() if match else None

# Find matching closing parentheses, respecting quotes
depth = 0
in_quote = None
for i, char in enumerate(rest):
if char in ('"', "'") and (i == 0 or rest[i - 1] != "\\"):
in_quote = char if in_quote is None else (None if in_quote == char else in_quote)
elif char == "(" and in_quote is None:
depth += 1
elif char == ")" and in_quote is None:
depth -= 1
if depth == 0:
return rest[1:i] # Return content between parentheses
return None

def get_outputs_from_main_nf(self):
with open(self.main_nf) as f:
data = f.read()
Expand All @@ -256,25 +297,16 @@ def get_outputs_from_main_nf(self):
output_data = data.split("output:")[1].split("when:")[0]
log.debug(f"Found output_data: {output_data}")
regex_emit = r"emit:\s*([^)\s,]+)"
regex_elements = r"\b(val|path|env|stdout|eval)\b\s*(\(([^)]+)\)|\s*([^)\s,]+))"
regex_keyword = r"\b(val|path|env|stdout|eval)\b"
for line in output_data.split("\n"):
match_emit = re.search(regex_emit, line)
matches_elements = re.finditer(regex_elements, line)
if not match_emit:
continue
channel_elements = []
outputs[match_emit.group(1)] = []
for _, match_element in enumerate(matches_elements, start=1):
output_val = None
if match_element.group(3):
output_val = match_element.group(3)
elif match_element.group(4):
output_val = match_element.group(4)
if output_val:
output_val = re.split(r',(?=(?:[^\'"]*[\'"][^\'"]*[\'"])*[^\'"]*$)', output_val)[
0
] # Takes only first part, avoid commas in quotes
output_val = output_val.strip().strip("'").strip('"') # remove quotes and whitespaces
for match in re.finditer(regex_keyword, line):
if output_val := self._extract_value_from_line(line, match.end()):
output_val = self._split_first_param(output_val)
channel_elements.append({output_val: {}})
if len(channel_elements) == 1:
outputs[match_emit.group(1)].append(channel_elements[0])
Expand Down Expand Up @@ -312,27 +344,18 @@ def get_topics_from_main_nf(self) -> None:
output_data = data.split("output:")[1].split("when:")[0]
log.debug(f"Output data: {output_data}")
regex_topic = r"topic:\s*([^)\s,]+)"
regex_elements = r"\b(val|path|env|stdout|eval)\b\s*(\(([^)]+)\)|\s*([^)\s,]+))"
regex_keyword = r"\b(val|path|env|stdout|eval)\b"
for line in output_data.split("\n"):
match_topic = re.search(regex_topic, line)
matches_elements = re.finditer(regex_elements, line)
if not match_topic:
continue
channel_elements: list[dict[str, dict]] = []
topic_name = match_topic.group(1)
if topic_name not in topics:
topics[topic_name] = []
for _, match_element in enumerate(matches_elements, start=1):
topic_val = None
if match_element.group(3):
topic_val = match_element.group(3)
elif match_element.group(4):
topic_val = match_element.group(4)
if topic_val:
topic_val = re.split(r',(?=(?:[^\'"]*[\'"][^\'"]*[\'"])*[^\'"]*$)', topic_val)[
0
] # Takes only first part, avoid commas in quotes
topic_val = topic_val.strip().strip("'").strip('"') # remove quotes and whitespaces
for match in re.finditer(regex_keyword, line):
if topic_val := self._extract_value_from_line(line, match.end()):
topic_val = self._split_first_param(topic_val)
channel_elements.append({topic_val: {}})
if len(channel_elements) == 1:
topics[topic_name].append(channel_elements[0])
Expand Down
Loading
Loading