keyword_extractor
¶
Full name: tenets.core.nlp.keyword_extractor
keyword_extractor¶
Keyword extraction using multiple methods.
This module provides comprehensive keyword extraction using: - RAKE (Rapid Automatic Keyword Extraction) - primary method - YAKE (if available and Python < 3.13) - TF-IDF with code-aware tokenization - BM25 ranking - Simple frequency-based extraction
Consolidates all keyword extraction logic to avoid duplication.
Classes¶
SimpleRAKE¶
Simple RAKE-like keyword extraction without NLTK dependencies.
Implements the core RAKE algorithm without requiring NLTK's punkt tokenizer. Uses simple regex-based sentence splitting and word tokenization.
Initialize SimpleRAKE.
PARAMETER | DESCRIPTION |
---|---|
stopwords | Set of stopwords to use |
max_length | Maximum n-gram length TYPE: |
Source code in tenets/core/nlp/keyword_extractor.py
Functions¶
extract_keywords_from_text¶
Extract keywords from text.
PARAMETER | DESCRIPTION |
---|---|
text | Input text TYPE: |
Source code in tenets/core/nlp/keyword_extractor.py
def extract_keywords_from_text(self, text: str):
"""Extract keywords from text.
Args:
text: Input text
"""
# Simple sentence splitting (period, exclamation, question mark, newline)
sentences = re.split(r"[.!?\n]+", text.lower())
# Extract candidate keywords from each sentence
candidates = []
for sentence in sentences:
# Remove non-word characters except spaces
sentence = re.sub(r"[^\w\s]", " ", sentence)
# Split by stopwords to get candidate phrases
words = sentence.split()
current_phrase = []
for word in words:
if word and word not in self.stopwords:
current_phrase.append(word)
elif current_phrase:
# End of phrase, add if within max length
if len(current_phrase) <= self.max_length:
candidates.append(" ".join(current_phrase))
current_phrase = []
# Don't forget the last phrase
if current_phrase and len(current_phrase) <= self.max_length:
candidates.append(" ".join(current_phrase))
# Calculate word scores (degree/frequency)
word_freq = Counter()
word_degree = Counter()
for phrase in candidates:
words_in_phrase = phrase.split()
degree = len(words_in_phrase)
for word in words_in_phrase:
word_freq[word] += 1
word_degree[word] += degree
# Calculate word scores
word_scores = {}
for word in word_freq:
word_scores[word] = word_degree[word] / word_freq[word]
# Calculate phrase scores
phrase_scores = {}
for phrase in candidates:
phrase_words = phrase.split()
phrase_scores[phrase] = sum(word_scores.get(w, 0) for w in phrase_words)
# Sort phrases by score
self.keywords = sorted(phrase_scores.items(), key=lambda x: x[1], reverse=True)
get_ranked_phrases_with_scores¶
Get ranked phrases with scores.
RETURNS | DESCRIPTION |
---|---|
List of (score, phrase) tuples |
KeywordExtractor¶
KeywordExtractor(use_rake: bool = True, use_yake: bool = True, language: str = 'en', use_stopwords: bool = True, stopword_set: str = 'prompt')
Multi-method keyword extraction with automatic fallback.
Provides robust keyword extraction using multiple algorithms with automatic fallback based on availability and Python version compatibility. Prioritizes fast, accurate methods while ensuring compatibility across Python versions.
Methods are attempted in order
- RAKE (Rapid Automatic Keyword Extraction) - Primary method, fast and Python 3.13+ compatible
- YAKE (Yet Another Keyword Extractor) - Secondary method, only for Python < 3.13 due to compatibility issues
- TF-IDF - Custom implementation, always available
- Frequency-based - Final fallback, simple but effective
ATTRIBUTE | DESCRIPTION |
---|---|
use_rake | Whether RAKE extraction is enabled and available. TYPE: |
use_yake | Whether YAKE extraction is enabled and available. TYPE: |
language | Language code for extraction (e.g., 'en' for English). TYPE: |
use_stopwords | Whether to filter stopwords during extraction. TYPE: |
stopword_set | Which stopword set to use ('code' or 'prompt'). TYPE: |
rake_extractor | RAKE extractor instance if available. TYPE: |
yake_extractor | YAKE instance if available. TYPE: |
tokenizer | Tokenizer for fallback extraction. TYPE: |
stopwords | Set of stopwords if filtering is enabled. |
Example
extractor = KeywordExtractor() keywords = extractor.extract("implement OAuth2 authentication") print(keywords) ['oauth2 authentication', 'implement', 'authentication']
Get keywords with scores¶
keywords_with_scores = extractor.extract( ... "implement OAuth2 authentication", ... include_scores=True ... ) print(keywords_with_scores) [('oauth2 authentication', 0.9), ('implement', 0.7), ...]
Note
On Python 3.13+, YAKE is automatically disabled due to a known infinite loop bug. RAKE is used as the primary extractor instead, providing similar quality with better performance.
Initialize keyword extractor with configurable extraction methods.
PARAMETER | DESCRIPTION |
---|---|
use_rake | Enable RAKE extraction if available. RAKE is fast and works well with technical text. Defaults to True. TYPE: |
use_yake | Enable YAKE extraction if available. Automatically disabled on Python 3.13+ due to compatibility issues. Defaults to True. TYPE: |
language | Language code for extraction algorithms. Currently supports 'en' (English). Other languages may work but are not officially tested. Defaults to 'en'. TYPE: |
use_stopwords | Whether to filter common stopwords during extraction. This can improve keyword quality but may miss some contextual phrases. Defaults to True. TYPE: |
stopword_set | Which stopword set to use. Options are: - 'prompt': Aggressive filtering for user prompts (200+ words) - 'code': Minimal filtering for code analysis (30 words) Defaults to 'prompt'. TYPE: |
RAISES | DESCRIPTION |
---|---|
None | Gracefully handles missing dependencies and logs warnings. |
Note
The extractor automatically detects available libraries and Python version to choose the best extraction method. If RAKE and YAKE are unavailable, it falls back to TF-IDF and frequency-based extraction.
Source code in tenets/core/nlp/keyword_extractor.py
def __init__(
self,
use_rake: bool = True,
use_yake: bool = True,
language: str = "en",
use_stopwords: bool = True,
stopword_set: str = "prompt",
):
"""Initialize keyword extractor with configurable extraction methods.
Args:
use_rake (bool, optional): Enable RAKE extraction if available.
RAKE is fast and works well with technical text. Defaults to True.
use_yake (bool, optional): Enable YAKE extraction if available.
Automatically disabled on Python 3.13+ due to compatibility issues.
Defaults to True.
language (str, optional): Language code for extraction algorithms.
Currently supports 'en' (English). Other languages may work but
are not officially tested. Defaults to 'en'.
use_stopwords (bool, optional): Whether to filter common stopwords
during extraction. This can improve keyword quality but may miss
some contextual phrases. Defaults to True.
stopword_set (str, optional): Which stopword set to use.
Options are:
- 'prompt': Aggressive filtering for user prompts (200+ words)
- 'code': Minimal filtering for code analysis (30 words)
Defaults to 'prompt'.
Raises:
None: Gracefully handles missing dependencies and logs warnings.
Note:
The extractor automatically detects available libraries and Python
version to choose the best extraction method. If RAKE and YAKE are
unavailable, it falls back to TF-IDF and frequency-based extraction.
"""
self.logger = get_logger(__name__)
self.use_rake = use_rake and RAKE_AVAILABLE
self.use_yake = use_yake and YAKE_AVAILABLE
self.language = language
self.use_stopwords = use_stopwords
self.stopword_set = stopword_set
# Log info about extraction methods
if sys.version_info[:2] >= (3, 13):
if not self.use_rake and RAKE_AVAILABLE:
self.logger.info("RAKE keyword extraction available but disabled")
if use_yake and not YAKE_AVAILABLE:
self.logger.warning(
"YAKE keyword extraction disabled on Python 3.13+ due to compatibility issues. "
"Using RAKE as primary extraction method."
)
# Initialize RAKE if available (primary method)
if self.use_rake and Rake is not None:
# Always use our bundled stopwords to avoid NLTK data dependency issues
from pathlib import Path
# Try to load bundled stopwords first
stopwords_path = (
Path(__file__).parent.parent.parent / "data" / "stopwords" / "minimal.txt"
)
if stopwords_path.exists():
try:
with open(stopwords_path, encoding="utf-8") as f:
stopwords = set(
line.strip().lower()
for line in f
if line.strip() and not line.startswith("#")
)
self.logger.debug(f"Loaded {len(stopwords)} stopwords from {stopwords_path}")
except Exception as e:
self.logger.warning(f"Failed to load stopwords file: {e}, using fallback")
stopwords = None
else:
stopwords = None
# Fallback to basic English stopwords if file not found
if not stopwords:
stopwords = {
"the",
"a",
"an",
"and",
"or",
"but",
"in",
"on",
"at",
"to",
"for",
"of",
"with",
"by",
"from",
"up",
"about",
"into",
"through",
"during",
"before",
"after",
"above",
"below",
"between",
"under",
"again",
"further",
"then",
"once",
"is",
"am",
"are",
"was",
"were",
"be",
"have",
"has",
"had",
"do",
"does",
"did",
"will",
"would",
"could",
"should",
"may",
"might",
"must",
"can",
"this",
"that",
"these",
"those",
"i",
"you",
"he",
"she",
"it",
"we",
"they",
"what",
"which",
"who",
"when",
"where",
"why",
"how",
"all",
"each",
"few",
"more",
"some",
"such",
"only",
"own",
"same",
"so",
"than",
"too",
"very",
}
self.logger.debug("Using built-in fallback stopwords")
try:
# Initialize RAKE with our custom stopwords (avoiding NLTK data dependency)
# We'll create a simple RAKE-like extractor to avoid NLTK punkt dependency
self.rake_extractor = SimpleRAKE(
stopwords=stopwords,
max_length=3, # Max n-gram size
)
except Exception as e:
self.logger.warning(f"Failed to initialize RAKE: {e}")
self.rake_extractor = None
self.use_rake = False
else:
self.rake_extractor = None
# Initialize YAKE if available (secondary method for Python < 3.13)
if self.use_yake and yake is not None:
self.yake_extractor = yake.KeywordExtractor(
lan=language,
n=3, # Max n-gram size
dedupLim=0.7,
dedupFunc="seqm",
windowsSize=1,
top=30,
)
else:
self.yake_extractor = None
# Initialize tokenizer
from .tokenizer import TextTokenizer
self.tokenizer = TextTokenizer(use_stopwords=use_stopwords)
# Get stopwords if needed
if use_stopwords:
from .stopwords import StopwordManager
self.stopwords = StopwordManager().get_set(stopword_set)
else:
self.stopwords = None
Functions¶
extract¶
extract(text: str, max_keywords: int = 20, include_scores: bool = False) -> Union[List[str], List[Tuple[str, float]]]
Extract keywords from text using the best available method.
Attempts extraction methods in priority order (RAKE → YAKE → TF-IDF → Frequency) until one succeeds. Each method returns normalized scores between 0 and 1, with higher scores indicating more relevant keywords.
PARAMETER | DESCRIPTION |
---|---|
text | Input text to extract keywords from. Can be any length, but very long texts may be truncated by some algorithms. TYPE: |
max_keywords | Maximum number of keywords to return. Keywords are sorted by relevance score. Defaults to 20. TYPE: |
include_scores | If True, return (keyword, score) tuples. If False, return only keyword strings. Defaults to False. TYPE: |
RETURNS | DESCRIPTION |
---|---|
Union[List[str], List[Tuple[str, float]]] | Union[List[str], List[Tuple[str, float]]]: - If include_scores=False: List of keyword strings sorted by relevance (e.g., ['oauth2', 'authentication', 'implement']) - If include_scores=True: List of (keyword, score) tuples where scores are normalized between 0 and 1 (e.g., [('oauth2', 0.95), ('authentication', 0.87), ...]) |
Examples:
>>> extractor = KeywordExtractor()
>>> # Simple keyword extraction
>>> keywords = extractor.extract("Python web framework Django")
>>> print(keywords)
['django', 'python web framework', 'web framework']
>>> # With scores for ranking
>>> scored = extractor.extract("Python web framework Django",
... max_keywords=5, include_scores=True)
>>> for keyword, score in scored:
... print(f"{keyword}: {score:.2f}")
django: 0.95
python web framework: 0.87
web framework: 0.82
Note
Empty input returns an empty list. All extraction methods handle various text formats including code, documentation, and natural language. Scores are normalized for consistency across methods.
Source code in tenets/core/nlp/keyword_extractor.py
def extract(
self, text: str, max_keywords: int = 20, include_scores: bool = False
) -> Union[List[str], List[Tuple[str, float]]]:
"""Extract keywords from text using the best available method.
Attempts extraction methods in priority order (RAKE → YAKE → TF-IDF →
Frequency) until one succeeds. Each method returns normalized scores
between 0 and 1, with higher scores indicating more relevant keywords.
Args:
text (str): Input text to extract keywords from. Can be any length,
but very long texts may be truncated by some algorithms.
max_keywords (int, optional): Maximum number of keywords to return.
Keywords are sorted by relevance score. Defaults to 20.
include_scores (bool, optional): If True, return (keyword, score)
tuples. If False, return only keyword strings. Defaults to False.
Returns:
Union[List[str], List[Tuple[str, float]]]:
- If include_scores=False: List of keyword strings sorted by
relevance (e.g., ['oauth2', 'authentication', 'implement'])
- If include_scores=True: List of (keyword, score) tuples where
scores are normalized between 0 and 1 (e.g.,
[('oauth2', 0.95), ('authentication', 0.87), ...])
Examples:
>>> extractor = KeywordExtractor()
>>> # Simple keyword extraction
>>> keywords = extractor.extract("Python web framework Django")
>>> print(keywords)
['django', 'python web framework', 'web framework']
>>> # With scores for ranking
>>> scored = extractor.extract("Python web framework Django",
... max_keywords=5, include_scores=True)
>>> for keyword, score in scored:
... print(f"{keyword}: {score:.2f}")
django: 0.95
python web framework: 0.87
web framework: 0.82
Note:
Empty input returns an empty list. All extraction methods handle
various text formats including code, documentation, and natural
language. Scores are normalized for consistency across methods.
"""
if not text:
return []
# Try RAKE first (primary method, Python 3.13 compatible)
if self.use_rake and self.rake_extractor:
try:
# SimpleRAKE handles its own tokenization
self.rake_extractor.extract_keywords_from_text(text)
keywords_with_scores = self.rake_extractor.get_ranked_phrases_with_scores()
# RAKE returns (score, phrase) tuples, normalize scores
if keywords_with_scores:
max_score = max(score for score, _ in keywords_with_scores)
if max_score > 0:
keywords = [
(phrase, score / max_score)
for score, phrase in keywords_with_scores[:max_keywords]
]
else:
keywords = [
(phrase, 1.0) for _, phrase in keywords_with_scores[:max_keywords]
]
else:
keywords = []
if include_scores:
return keywords
return [kw for kw, _ in keywords]
except Exception as e:
self.logger.warning(f"RAKE extraction failed: {e}")
# Try YAKE second (if available and Python < 3.13)
if self.use_yake and self.yake_extractor:
try:
keywords = self.yake_extractor.extract_keywords(text)
# YAKE returns (keyword, score) where lower score is better
keywords = [(kw, 1.0 - score) for kw, score in keywords[:max_keywords]]
if include_scores:
return keywords
return [kw for kw, _ in keywords]
except Exception as e:
self.logger.warning(f"YAKE extraction failed: {e}")
# Fallback to TF-IDF or frequency
return self._extract_fallback(text, max_keywords, include_scores)
TFIDFCalculator¶
TF-IDF calculator with code-aware tokenization.
Implements Term Frequency-Inverse Document Frequency scoring optimized for code search. Uses vector space model with cosine similarity for ranking.
Key features: - Code-aware tokenization using NLP tokenizers - Configurable stopword filtering - Sublinear TF scaling to reduce impact of very frequent terms - L2 normalization for cosine similarity - Efficient sparse vector representation
Initialize TF-IDF calculator.
PARAMETER | DESCRIPTION |
---|---|
use_stopwords | Whether to filter stopwords TYPE: |
stopword_set | Which stopword set to use ('code', 'prompt') TYPE: |
Source code in tenets/core/nlp/keyword_extractor.py
def __init__(self, use_stopwords: bool = False, stopword_set: str = "code"):
"""Initialize TF-IDF calculator.
Args:
use_stopwords: Whether to filter stopwords
stopword_set: Which stopword set to use ('code', 'prompt')
"""
self.logger = get_logger(__name__)
self.use_stopwords = use_stopwords
self.stopword_set = stopword_set
# Use NLP tokenizer
from .tokenizer import CodeTokenizer
self.tokenizer = CodeTokenizer(use_stopwords=use_stopwords)
# Core data structures
self.document_count = 0
self.document_frequency: Dict[str, int] = defaultdict(int)
self.document_vectors: Dict[str, Dict[str, float]] = {}
self.document_norms: Dict[str, float] = {}
self.idf_cache: Dict[str, float] = {}
self.vocabulary: Set[str] = set()
Functions¶
tokenize¶
compute_tf¶
Compute term frequency with optional sublinear scaling.
PARAMETER | DESCRIPTION |
---|---|
tokens | List of tokens from document |
use_sublinear | Use log scaling (1 + log(tf)) to reduce impact of very frequent terms TYPE: |
RETURNS | DESCRIPTION |
---|---|
Dict[str, float] | Dictionary mapping terms to TF scores |
Source code in tenets/core/nlp/keyword_extractor.py
def compute_tf(self, tokens: List[str], use_sublinear: bool = True) -> Dict[str, float]:
"""Compute term frequency with optional sublinear scaling.
Args:
tokens: List of tokens from document
use_sublinear: Use log scaling (1 + log(tf)) to reduce impact of
very frequent terms
Returns:
Dictionary mapping terms to TF scores
"""
if not tokens:
return {}
tf_raw = Counter(tokens)
if use_sublinear:
# Sublinear TF: 1 + log(count)
return {term: 1.0 + math.log(count) for term, count in tf_raw.items()}
else:
# Normalized TF: count / total
total = len(tokens)
return {term: count / total for term, count in tf_raw.items()}
compute_idf¶
Compute inverse document frequency for a term.
PARAMETER | DESCRIPTION |
---|---|
term | Term to compute IDF for TYPE: |
RETURNS | DESCRIPTION |
---|---|
float | IDF value |
Source code in tenets/core/nlp/keyword_extractor.py
def compute_idf(self, term: str) -> float:
"""Compute inverse document frequency for a term.
Args:
term: Term to compute IDF for
Returns:
IDF value
"""
if term in self.idf_cache:
return self.idf_cache[term]
if self.document_count == 0:
return 0.0
# Use smoothed IDF to handle edge cases
df = self.document_frequency.get(term, 0)
# Use standard smoothed IDF that varies with document_count and df
# idf = log((N + 1) / (df + 1)) with a tiny epsilon so values can
# change detectably when the corpus grows even if df grows as well.
idf = math.log((1 + self.document_count) / (1 + df))
# Add a very small epsilon dependent on corpus size to avoid identical
# floats when called before/after cache invalidation in tiny corpora.
idf += 1e-12 * max(1, self.document_count)
self.idf_cache[term] = idf
return idf
add_document¶
Add document to corpus and compute TF-IDF vector.
PARAMETER | DESCRIPTION |
---|---|
doc_id | Unique document identifier TYPE: |
text | Document text content TYPE: |
RETURNS | DESCRIPTION |
---|---|
Dict[str, float] | TF-IDF vector for the document |
Source code in tenets/core/nlp/keyword_extractor.py
def add_document(self, doc_id: str, text: str) -> Dict[str, float]:
"""Add document to corpus and compute TF-IDF vector.
Args:
doc_id: Unique document identifier
text: Document text content
Returns:
TF-IDF vector for the document
"""
# Tokenize document using NLP tokenizer
tokens = self.tokenize(text)
if not tokens:
self.document_vectors[doc_id] = {}
self.document_norms[doc_id] = 0.0
return {}
# Update corpus statistics
self.document_count += 1
unique_terms = set(tokens)
for term in unique_terms:
self.document_frequency[term] += 1
self.vocabulary.add(term)
# Compute TF scores
tf_scores = self.compute_tf(tokens)
# Compute TF-IDF vector
tfidf_vector = {}
for term, tf in tf_scores.items():
# Use +1 smoothing on IDF during vector construction to avoid
# zero vectors in tiny corpora while keeping compute_idf()'s
# return value unchanged for tests that assert it directly.
idf = self.compute_idf(term) + 1.0
tfidf_vector[term] = tf * idf
# L2 normalization for cosine similarity
norm = math.sqrt(sum(score**2 for score in tfidf_vector.values()))
if norm > 0:
tfidf_vector = {term: score / norm for term, score in tfidf_vector.items()}
self.document_norms[doc_id] = norm
else:
self.document_norms[doc_id] = 0.0
self.document_vectors[doc_id] = tfidf_vector
# Clear IDF cache since document frequencies changed
self.idf_cache.clear()
return tfidf_vector
compute_similarity¶
Compute cosine similarity between query and document.
PARAMETER | DESCRIPTION |
---|---|
query_text | Query text TYPE: |
doc_id | Document identifier TYPE: |
RETURNS | DESCRIPTION |
---|---|
float | Cosine similarity score (0-1) |
Source code in tenets/core/nlp/keyword_extractor.py
def compute_similarity(self, query_text: str, doc_id: str) -> float:
"""Compute cosine similarity between query and document.
Args:
query_text: Query text
doc_id: Document identifier
Returns:
Cosine similarity score (0-1)
"""
# Get document vector
doc_vector = self.document_vectors.get(doc_id, {})
if not doc_vector:
return 0.0
# Process query using NLP tokenizer
query_tokens = self.tokenize(query_text)
if not query_tokens:
return 0.0
# Compute query TF-IDF vector
query_tf = self.compute_tf(query_tokens)
query_vector = {}
for term, tf in query_tf.items():
if term in self.vocabulary:
# Match the +1 smoothing used during document vector build
idf = self.compute_idf(term) + 1.0
query_vector[term] = tf * idf
# Normalize query vector
query_norm = math.sqrt(sum(score**2 for score in query_vector.values()))
if query_norm > 0:
query_vector = {term: score / query_norm for term, score in query_vector.items()}
else:
return 0.0
# Use sparse cosine similarity from similarity module
from .similarity import sparse_cosine_similarity
return sparse_cosine_similarity(query_vector, doc_vector)
build_corpus¶
Build TF-IDF corpus from multiple documents.
PARAMETER | DESCRIPTION |
---|---|
documents | List of (doc_id, text) tuples |
Source code in tenets/core/nlp/keyword_extractor.py
def build_corpus(self, documents: List[Tuple[str, str]]) -> None:
"""Build TF-IDF corpus from multiple documents.
Args:
documents: List of (doc_id, text) tuples
"""
import os
cpu_count = os.cpu_count() or 1
self.logger.info(
f"Building TF-IDF corpus from {len(documents)} documents "
f"(sequential processing, CPU cores available: {cpu_count})"
)
for doc_id, text in documents:
self.add_document(doc_id, text)
self.logger.info(
f"Corpus built: {self.document_count} documents, {len(self.vocabulary)} unique terms"
)
get_top_terms¶
Return top-n terms by TF-IDF weight for a document.
PARAMETER | DESCRIPTION |
---|---|
doc_id | Document identifier TYPE: |
n | Max number of terms to return TYPE: |
RETURNS | DESCRIPTION |
---|---|
List[Tuple[str, float]] | List of (term, score) sorted by descending score. |
Source code in tenets/core/nlp/keyword_extractor.py
def get_top_terms(self, doc_id: str, n: int = 10) -> List[Tuple[str, float]]:
"""Return top-n terms by TF-IDF weight for a document.
Args:
doc_id: Document identifier
n: Max number of terms to return
Returns:
List of (term, score) sorted by descending score.
"""
vector = self.document_vectors.get(doc_id, {})
if not vector:
return []
# Already L2-normalized; return the highest-weight terms
return sorted(vector.items(), key=lambda x: x[1], reverse=True)[: max(0, n)]
BM25Calculator¶
BM25Calculator(k1: float = 1.2, b: float = 0.75, use_stopwords: bool = False, stopword_set: str = 'code')
BM25 ranking algorithm implementation.
BM25 (Best Matching 25) is a probabilistic ranking function that often outperforms TF-IDF for information retrieval. Uses NLP tokenizers.
Initialize BM25 calculator.
PARAMETER | DESCRIPTION |
---|---|
k1 | Controls term frequency saturation TYPE: |
b | Controls length normalization TYPE: |
use_stopwords | Whether to filter stopwords TYPE: |
stopword_set | Which stopword set to use TYPE: |
Source code in tenets/core/nlp/keyword_extractor.py
def __init__(
self,
k1: float = 1.2,
b: float = 0.75,
use_stopwords: bool = False,
stopword_set: str = "code",
):
"""Initialize BM25 calculator.
Args:
k1: Controls term frequency saturation
b: Controls length normalization
use_stopwords: Whether to filter stopwords
stopword_set: Which stopword set to use
"""
self.logger = get_logger(__name__)
self.k1 = k1
self.b = b
self.use_stopwords = use_stopwords
self.stopword_set = stopword_set
# Use NLP tokenizer
from .tokenizer import CodeTokenizer
self.tokenizer = CodeTokenizer(use_stopwords=use_stopwords)
# Core data structures
self.document_count = 0
self.document_frequency: Dict[str, int] = defaultdict(int)
self.document_lengths: Dict[str, int] = {}
self.document_tokens: Dict[str, List[str]] = {}
self.average_doc_length = 0.0
self.vocabulary: Set[str] = set()
self.idf_cache: Dict[str, float] = {}
Functions¶
tokenize¶
add_document¶
Add document to BM25 corpus.
PARAMETER | DESCRIPTION |
---|---|
doc_id | Unique document identifier TYPE: |
text | Document text content TYPE: |
Source code in tenets/core/nlp/keyword_extractor.py
def add_document(self, doc_id: str, text: str) -> None:
"""Add document to BM25 corpus.
Args:
doc_id: Unique document identifier
text: Document text content
"""
tokens = self.tokenize(text)
if not tokens:
self.document_lengths[doc_id] = 0
self.document_tokens[doc_id] = []
return
# Update corpus statistics
self.document_count += 1
self.document_lengths[doc_id] = len(tokens)
self.document_tokens[doc_id] = tokens
# Update document frequency
unique_terms = set(tokens)
for term in unique_terms:
self.document_frequency[term] += 1
self.vocabulary.add(term)
# Update average document length
total_length = sum(self.document_lengths.values())
self.average_doc_length = total_length / max(1, self.document_count)
# Clear IDF cache
self.idf_cache.clear()
compute_idf¶
Compute IDF component for BM25.
PARAMETER | DESCRIPTION |
---|---|
term | Term to compute IDF for TYPE: |
RETURNS | DESCRIPTION |
---|---|
float | IDF value |
Source code in tenets/core/nlp/keyword_extractor.py
def compute_idf(self, term: str) -> float:
"""Compute IDF component for BM25.
Args:
term: Term to compute IDF for
Returns:
IDF value
"""
if term in self.idf_cache:
return self.idf_cache[term]
df = self.document_frequency.get(term, 0)
# Use a smoothed, always-positive IDF variant to avoid zeros/negatives
# in tiny corpora and to better separate relevant docs:
# idf = log(1 + (N - df + 0.5)/(df + 0.5))
numerator = max(0.0, (self.document_count - df + 0.5))
denominator = df + 0.5
ratio = (numerator / denominator) if denominator > 0 else 0.0
idf = math.log(1.0 + ratio)
self.idf_cache[term] = idf
return idf
score_document¶
Calculate BM25 score for a document.
PARAMETER | DESCRIPTION |
---|---|
query_tokens | Tokenized query |
doc_id | Document identifier TYPE: |
RETURNS | DESCRIPTION |
---|---|
float | BM25 score |
Source code in tenets/core/nlp/keyword_extractor.py
def score_document(self, query_tokens: List[str], doc_id: str) -> float:
"""Calculate BM25 score for a document.
Args:
query_tokens: Tokenized query
doc_id: Document identifier
Returns:
BM25 score
"""
if doc_id not in self.document_tokens:
return 0.0
doc_tokens = self.document_tokens[doc_id]
if not doc_tokens:
return 0.0
doc_length = self.document_lengths[doc_id]
# Count term frequencies in document
doc_tf = Counter(doc_tokens)
score = 0.0
for term in query_tokens:
if term not in self.vocabulary:
continue
# IDF component
idf = self.compute_idf(term)
# Term frequency component with saturation
tf = doc_tf.get(term, 0)
# Length normalization factor
norm_factor = 1 - self.b + self.b * (doc_length / self.average_doc_length)
# BM25 formula
tf_component = (tf * (self.k1 + 1)) / (tf + self.k1 * norm_factor)
score += idf * tf_component
return score
search¶
Search documents using BM25 ranking.
PARAMETER | DESCRIPTION |
---|---|
query | Search query TYPE: |
top_k | Number of top results to return TYPE: |
RETURNS | DESCRIPTION |
---|---|
List[Tuple[str, float]] | List of (doc_id, score) tuples sorted by score |
Source code in tenets/core/nlp/keyword_extractor.py
def search(self, query: str, top_k: int = 10) -> List[Tuple[str, float]]:
"""Search documents using BM25 ranking.
Args:
query: Search query
top_k: Number of top results to return
Returns:
List of (doc_id, score) tuples sorted by score
"""
query_tokens = self.tokenize(query)
if not query_tokens:
return []
# Score all documents
scores = []
for doc_id in self.document_tokens:
score = self.score_document(query_tokens, doc_id)
if score > 0:
scores.append((doc_id, score))
# Sort by score
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
build_corpus¶
Build BM25 corpus from multiple documents.
PARAMETER | DESCRIPTION |
---|---|
documents | List of (doc_id, text) tuples |
Source code in tenets/core/nlp/keyword_extractor.py
def build_corpus(self, documents: List[Tuple[str, str]]) -> None:
"""Build BM25 corpus from multiple documents.
Args:
documents: List of (doc_id, text) tuples
"""
import os
cpu_count = os.cpu_count() or 1
self.logger.info(
f"Building BM25 corpus from {len(documents)} documents "
f"(sequential processing, CPU cores available: {cpu_count})"
)
for doc_id, text in documents:
self.add_document(doc_id, text)
self.logger.info(
f"BM25 corpus built: {self.document_count} documents, "
f"{len(self.vocabulary)} unique terms, "
f"avg doc length: {self.average_doc_length:.1f}"
)
TFIDFExtractor¶
Simple TF-IDF vectorizer with NLP tokenization.
Provides a scikit-learn-like interface with fit/transform methods returning dense vectors. Uses TextTokenizer for general text.
Initialize the extractor.
PARAMETER | DESCRIPTION |
---|---|
use_stopwords | Whether to filter stopwords TYPE: |
stopword_set | Which stopword set to use ('prompt'|'code') TYPE: |
Source code in tenets/core/nlp/keyword_extractor.py
def __init__(self, use_stopwords: bool = True, stopword_set: str = "prompt"):
"""Initialize the extractor.
Args:
use_stopwords: Whether to filter stopwords
stopword_set: Which stopword set to use ('prompt'|'code')
"""
self.logger = get_logger(__name__)
self.use_stopwords = use_stopwords
self.stopword_set = stopword_set
# Tokenizer for general text
from .tokenizer import TextTokenizer
self.tokenizer = TextTokenizer(use_stopwords=use_stopwords)
# Learned state
self._fitted = False
self._vocabulary: List[str] = []
self._term_to_index: Dict[str, int] = {}
self._idf: Dict[str, float] = {}
self._doc_count: int = 0
self._df: Dict[str, int] = defaultdict(int)
Functions¶
fit¶
Learn vocabulary and IDF from documents.
PARAMETER | DESCRIPTION |
---|---|
documents | List of input texts |
RETURNS | DESCRIPTION |
---|---|
TFIDFExtractor | self |
Source code in tenets/core/nlp/keyword_extractor.py
def fit(self, documents: List[str]) -> "TFIDFExtractor":
"""Learn vocabulary and IDF from documents.
Args:
documents: List of input texts
Returns:
self
"""
self._doc_count = 0
self._df.clear()
for doc in documents or []:
tokens = self.tokenizer.tokenize(doc)
if not tokens:
continue
self._doc_count += 1
for term in set(tokens):
self._df[term] += 1
# Build vocabulary in deterministic order
self._vocabulary = list(self._df.keys())
self._vocabulary.sort()
self._term_to_index = {t: i for i, t in enumerate(self._vocabulary)}
# Compute smoothed IDF
self._idf = {}
for term, df in self._df.items():
# log((N + 1) / (df + 1)) to avoid div by zero and dampen extremes
self._idf[term] = (
math.log((self._doc_count + 1) / (df + 1)) if self._doc_count > 0 else 0.0
)
self._fitted = True
return self
transform¶
Transform documents to dense TF-IDF vectors.
PARAMETER | DESCRIPTION |
---|---|
documents | List of input texts |
RETURNS | DESCRIPTION |
---|---|
List[List[float]] | List of dense vectors (each aligned to the learned vocabulary) |
Source code in tenets/core/nlp/keyword_extractor.py
def transform(self, documents: List[str]) -> List[List[float]]:
"""Transform documents to dense TF-IDF vectors.
Args:
documents: List of input texts
Returns:
List of dense vectors (each aligned to the learned vocabulary)
"""
if not self._fitted:
raise RuntimeError("TFIDFExtractor not fitted. Call fit(documents) first.")
vectors: List[List[float]] = []
vocab_size = len(self._vocabulary)
for doc in documents or []:
tokens = self.tokenizer.tokenize(doc)
if not tokens or vocab_size == 0:
vectors.append([])
continue
# Sublinear TF
tf_raw = Counter(t for t in tokens if t in self._term_to_index)
if not tf_raw:
vectors.append([0.0] * vocab_size if vocab_size <= 2048 else [])
continue
tf_scores = {term: 1.0 + math.log(cnt) for term, cnt in tf_raw.items()}
# Build dense vector
vec = [0.0] * vocab_size
for term, tf in tf_scores.items():
idx = self._term_to_index[term]
idf = self._idf.get(term, 0.0)
vec[idx] = tf * idf
# L2 normalize
norm = math.sqrt(sum(x * x for x in vec))
if norm > 0:
vec = [x / norm for x in vec]
vectors.append(vec)
return vectors