S. VenkataKeerthy 21f1f9558d
[IR2Vec][llvm-ir2vec] Changing clEnumValN to cl::SubCommand (#151384)
Refactor llvm-ir2vec to use subcommands instead of a mode flag for better CLI usability.

- Converted the `--mode` flag to three distinct subcommands: `triplets`, `entities`, and `embeddings`
- Updated documentation, tests, and python script
2025-08-02 13:44:55 -07:00

305 lines
11 KiB
Python

# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
"""IR2Vec Triplet Generator
Generates IR2Vec triplets by applying random optimization levels to LLVM IR files
and extracting triplets using llvm-ir2vec. Automatically generates preprocessed
files: entity2id.txt, relation2id.txt, and train2id.txt.
Usage:
python generateTriplets.py <llvm_build_dir> <num_optimizations> <ll_file_list> <output_dir>
"""
import argparse
import logging
import os
import random
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Set, Tuple
# Configuration
OPT_LEVELS = ["O0", "O1", "O2", "O3", "Os", "Oz"]
DEFAULT_MAX_WORKERS = 100
logger = logging.getLogger(__name__)
# TODO: Change this to a dataclass with slots
# when Python 3.10+ is the minimum version
# https://docs.python.org/3/library/dataclasses.html#dataclasses.dataclass
class TripletResult:
"""Result from processing a single LLVM IR file"""
__slots__ = ["triplets", "max_relation"]
def __init__(self, triplets: Set[str], max_relation: int):
self.triplets = triplets
self.max_relation = max_relation
class IR2VecTripletGenerator:
"""Main class for generating IR2Vec triplets"""
def __init__(
self,
llvm_build_dir: Path,
num_optimizations: int,
output_dir: Path,
max_workers: int = DEFAULT_MAX_WORKERS,
):
self.llvm_build_dir = llvm_build_dir
self.num_optimizations = num_optimizations
self.output_dir = output_dir
self.max_workers = max_workers
# Tool paths
self.opt_binary = os.path.join(llvm_build_dir, "bin", "opt")
self.ir2vec_binary = os.path.join(llvm_build_dir, "bin", "llvm-ir2vec")
self._validate_setup()
# Create output directory if it doesn't exist
self.output_dir.mkdir(parents=True, exist_ok=True)
def _validate_setup(self):
"""Validate that all required tools and paths exist"""
if not self.llvm_build_dir.exists():
raise FileNotFoundError(
f"LLVM build directory not found: {self.llvm_build_dir}"
)
if not os.path.isfile(self.opt_binary) or not os.access(
self.opt_binary, os.X_OK
):
raise FileNotFoundError(
f"opt binary not found or not executable: {self.opt_binary}"
)
if not os.path.isfile(self.ir2vec_binary) or not os.access(
self.ir2vec_binary, os.X_OK
):
raise FileNotFoundError(
f"llvm-ir2vec binary not found or not executable: {self.ir2vec_binary}"
)
if not (1 <= self.num_optimizations <= len(OPT_LEVELS)):
raise ValueError(
f"Number of optimizations must be between 1-{len(OPT_LEVELS)}"
)
def _select_optimization_levels(self) -> List[str]:
"""Select unique random optimization levels"""
return random.sample(OPT_LEVELS, self.num_optimizations)
def _process_single_file(self, input_file: Path) -> TripletResult:
"""Process a single LLVM IR file with multiple optimization levels"""
all_triplets = set()
max_relation = 1
opt_levels = self._select_optimization_levels()
for opt_level in opt_levels:
triplets, file_max_relation = self._run_pipeline(input_file, opt_level)
if triplets:
all_triplets.update(triplets)
max_relation = max(max_relation, file_max_relation)
logger.debug(
f"Generated {len(triplets)} triplets for {input_file} with {opt_level}"
)
return TripletResult(all_triplets, max_relation)
def _run_pipeline(self, input_file: Path, opt_level: str) -> Tuple[Set[str], int]:
"""Run opt | llvm-ir2vec pipeline using subprocess pipes."""
try:
# Run opt first
opt_proc = subprocess.Popen(
[self.opt_binary, f"-{opt_level}", str(input_file), "-o", "-"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Run llvm-ir2vec with opt's output as input
ir2vec_proc = subprocess.Popen(
[self.ir2vec_binary, "triplets", "-", "-o", "-"],
stdin=opt_proc.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
opt_proc.stdout.close()
stdout, _ = ir2vec_proc.communicate()
opt_proc.wait()
# Check if either process failed
if opt_proc.returncode != 0 or ir2vec_proc.returncode != 0:
return set(), 1
return self._parse_triplet_output(stdout)
except (subprocess.SubprocessError, OSError):
return set(), 1
def _parse_triplet_output(self, output: str) -> Tuple[Set[str], int]:
"""Parse triplet output and extract max relation"""
if not output.strip():
return set(), 1
lines = output.strip().split("\n")
max_relation = 1
# Extract max relation from metadata line
if lines and lines[0].startswith("MAX_RELATION="):
max_relation = int(lines[0].split("=")[1])
lines = lines[1:]
# Remove duplicate triplets by converting to a set
return set(lines), max_relation
def generate_triplets(self, file_list: Path) -> None:
"""Main method to generate triplets from a list of LLVM IR files"""
input_files = self._read_file_list(file_list)
logger.info(
f"Processing {len(input_files)} files with {self.num_optimizations} "
f"optimization levels using {self.max_workers} workers"
)
all_triplets = set()
global_max_relation = 1
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_file = {
executor.submit(self._process_single_file, file): file
for file in input_files
}
for future in as_completed(future_to_file):
try:
result = future.result()
all_triplets.update(result.triplets)
global_max_relation = max(global_max_relation, result.max_relation)
except (subprocess.SubprocessError, OSError, ValueError) as e:
file_path = future_to_file[future]
logger.error(f"Error processing {file_path}: {e}")
self._generate_output_files(all_triplets, global_max_relation)
logger.info("Processing completed successfully")
def _read_file_list(self, file_list: Path) -> List[Path]:
"""Read and validate the list of input files"""
input_files = []
with open(file_list, "r") as f:
for line_num, line in enumerate(f, 1):
if line := line.strip():
file_path = Path(line)
if file_path.exists():
input_files.append(file_path)
else:
logger.warning(f"File not found (line {line_num}): {file_path}")
if not input_files:
raise ValueError("No valid input files found")
return input_files
def _generate_output_files(self, all_triplets: Set[str], max_relation: int) -> None:
"""Generate the final output files"""
logger.info(f"Generating output files with {len(all_triplets)} unique triplets")
# Write all output files -- train2id.txt, entity2id.txt, relation2id.txt
train2id_file = os.path.join(self.output_dir, "train2id.txt")
entity2id_file = os.path.join(self.output_dir, "entity2id.txt")
relation2id_file = os.path.join(self.output_dir, "relation2id.txt")
with open(train2id_file, "w") as f:
f.write(f"{len(all_triplets)}\n")
f.writelines(f"{triplet}\n" for triplet in all_triplets)
self._generate_entity2id(entity2id_file)
self._generate_relation2id(relation2id_file, max_relation)
def _generate_entity2id(self, output_file: Path) -> None:
"""Generate entity2id.txt using llvm-ir2vec"""
subprocess.run(
[str(self.ir2vec_binary), "entities", "-o", str(output_file)],
check=True,
capture_output=True,
)
def _generate_relation2id(self, output_file: Path, max_relation: int) -> None:
"""Generate relation2id.txt from max relation"""
max_relation = max(max_relation, 1) # At least Type and Next relations
num_relations = max_relation + 1
with open(output_file, "w") as f:
f.write(f"{num_relations}\n")
f.write("Type\t0\n")
f.write("Next\t1\n")
f.writelines(f"Arg{i-2}\t{i}\n" for i in range(2, num_relations))
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Generate IR2Vec triplets from LLVM IR files",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"llvm_build_dir", type=Path, help="Path to LLVM build directory"
)
parser.add_argument(
"num_optimizations",
type=int,
help="Number of optimization levels to apply (1-6)",
)
parser.add_argument(
"ll_file_list",
type=Path,
help="File containing list of LLVM IR files to process",
)
parser.add_argument(
"output_dir", type=Path, help="Output directory for generated files"
)
parser.add_argument(
"-j",
"--max-workers",
type=int,
default=DEFAULT_MAX_WORKERS,
help=f"Maximum number of parallel workers (default: {DEFAULT_MAX_WORKERS})",
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="Enable debug logging"
)
parser.add_argument(
"-q", "--quiet", action="store_true", help="Suppress all output except errors"
)
args = parser.parse_args()
# Configure logging
level = (
logging.ERROR
if args.quiet
else (logging.DEBUG if args.verbose else logging.INFO)
)
logging.basicConfig(
level=level,
format="[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
generator = IR2VecTripletGenerator(
args.llvm_build_dir,
args.num_optimizations,
args.output_dir,
args.max_workers,
)
generator.generate_triplets(args.ll_file_list)
if __name__ == "__main__":
main()