Per-article summarization and preference matching

This commit is contained in:
Adrian Rumpold
2025-07-01 14:16:00 +02:00
parent 77497ed56b
commit 547da4517a
2 changed files with 181 additions and 63 deletions

View File

@@ -19,7 +19,7 @@ from scrape import JinaScraper
NUM_STORIES = 20 NUM_STORIES = 20
USER_PREFERENCES = ["Machine Learning", "Linux", "Open-Source"] USER_PREFERENCES = ["Machine Learning", "Linux", "Open-Source"]
ENABLE_SLACK = False # Send updates to Slack, need to set SLACK_BOT_TOKEN env var ENABLE_SLACK = True # Send updates to Slack, need to set SLACK_BOT_TOKEN env var
ENABLE_MLFLOW_TRACING = False # Use MLflow (at http://localhost:5000) for tracing ENABLE_MLFLOW_TRACING = False # Use MLflow (at http://localhost:5000) for tracing
@@ -41,12 +41,13 @@ class State(TypedDict):
preferences: Iterable[str] preferences: Iterable[str]
context: list[langchain_core.documents.Document] context: list[langchain_core.documents.Document]
answer: str answer: str
summaries: list[dict]
def retrieve(state: State): def retrieve(state: State, top_n: int = 5) -> State:
# Search for relevant documents # Search for relevant documents
retrieved_docs = vector_store.similarity_search( retrieved_docs = vector_store.similarity_search(
"Categories: " + ", ".join(state["preferences"]), k=10 "Categories: " + ", ".join(state["preferences"]), k=20
) )
# If you're using chunks, group them back into complete stories # If you're using chunks, group them back into complete stories
@@ -76,39 +77,126 @@ def retrieve(state: State):
) )
complete_stories.append(complete_story) complete_stories.append(complete_story)
return {"context": complete_stories[:5]} # Limit to top 5 stories return {"context": complete_stories[:top_n]}
def generate(state: State): def generate_structured_summaries(state: State):
docs_content = "\n\n".join(doc.page_content for doc in state["context"]) """Generate structured summaries for each story individually."""
summaries = []
for doc in state["context"]:
# Create a prompt for individual story summarization
prompt = langchain.prompts.PromptTemplate( prompt = langchain.prompts.PromptTemplate(
input_variables=["preferences", "context"], input_variables=["preferences", "title", "content", "source", "categories"],
template=( template=(
"You are a helpful assistant that can provide updates on technology topics based on the topics a user has expressed interest in and additional context.\n\n" "You are a helpful assistant that summarizes technology articles.\n\n"
"Please respond in Markdown format and group your answers based on the categories of the items in the context.\n" "User preferences: {preferences}\n\n"
"If applicable, add hyperlinks to the original source as part of the headline for each story.\n" "Article title: {title}\n"
"Limit your summaries to approximately 100 words per item.\n\n" "Article categories: {categories}\n"
"Preferences: {preferences}\n\n" "Article content: {content}\n"
"Context:\n{context}\n\n" "Source URL: {source}\n\n"
"Answer:" "Use an informative but not too formal tone.\n"
"Please provide:\n"
"1. A concise summary (around 50 words) that highlights the key insights from the article.\n"
"2. The single user preference that this article best matches (or 'Other' if none match well)\n\n"
"Format your response as:\n"
"PREFERENCE: [preference name or 'Other']\n"
"SUMMARY: [your summary here]\n"
), ),
) )
messages = prompt.invoke( messages = prompt.invoke(
{"preferences": state["preferences"], "context": docs_content} {
"preferences": ", ".join(state["preferences"]),
"title": doc.metadata.get("title", "Unknown Title"),
"content": doc.page_content[:5000], # Limit content length for LLM
"source": doc.metadata.get("source", ""),
"categories": ", ".join(doc.metadata.get("categories", [])),
}
) )
response = llm.invoke(messages)
return {"answer": response.content} response = llm.invoke(messages).content
# Parse the LLM response to extract preference and summary
response_text = response if isinstance(response, str) else str(response)
lines = response_text.strip().split("\n")
matched_preference = "Other"
summary_text = response_text
for line in lines:
if line.startswith("PREFERENCE:"):
matched_preference = line.replace("PREFERENCE:", "").strip()
elif line.startswith("SUMMARY:"):
summary_text = line.replace("SUMMARY:", "").strip()
# If we didn't find the structured format, use the whole response as summary
if not any(line.startswith("SUMMARY:") for line in lines):
summary_text = response_text.strip()
summaries.append(
{
"title": doc.metadata.get("title", "Unknown Title"),
"summary": summary_text,
"source_url": doc.metadata.get("source", ""),
"categories": doc.metadata.get("categories", []),
"story_id": doc.metadata.get("story_id"),
"matched_preference": matched_preference,
}
)
return {"summaries": summaries}
def run_query(preferences: Iterable[str]) -> str: def group_stories_by_preference(
graph_builder = langgraph.graph.StateGraph(State).add_sequence([retrieve, generate]) summaries: list[dict], preferences: list[str]
) -> dict[str, list[dict]]:
"""Group stories by their matched preferences in the order of user preferences."""
preference_groups = {}
# Group stories by the LLM-determined preference matching
for summary in summaries:
matched_preference = summary.get("matched_preference", "Other")
if matched_preference not in preference_groups:
preference_groups[matched_preference] = []
preference_groups[matched_preference].append(summary)
# Create ordered groups based on user preferences
ordered_groups = {}
# Add groups for user preferences in order
for preference in preferences:
if preference in preference_groups:
ordered_groups[preference] = preference_groups[preference]
# Add "Other" group at the end if it exists
if "Other" in preference_groups:
ordered_groups["Other"] = preference_groups["Other"]
return ordered_groups
def create_slack_blocks(summaries: list[dict], preferences: list[str]) -> list[dict]:
"""Convert structured summaries into Slack block format grouped by user preferences."""
grouped_stories = group_stories_by_preference(summaries, preferences)
return slack.format_slack_blocks(grouped_stories)
def run_structured_query(
preferences: Iterable[str],
) -> list[dict]:
"""Run query and return structured summary data."""
graph_builder = langgraph.graph.StateGraph(State).add_sequence(
[retrieve, generate_structured_summaries]
)
graph_builder.add_edge(langgraph.graph.START, "retrieve") graph_builder.add_edge(langgraph.graph.START, "retrieve")
graph = graph_builder.compile() graph = graph_builder.compile()
response = graph.invoke(State(preferences=preferences, context=[], answer="")) response = graph.invoke(
return response["answer"] State(preferences=preferences, context=[], answer="", summaries=[])
)
summaries = response["summaries"]
return summaries
def get_existing_story_ids() -> set[str]: def get_existing_story_ids() -> set[str]:
@@ -235,10 +323,11 @@ async def main():
print("No new stories to process") print("No new stories to process")
# 4. Query # 4. Query
answer = run_query(USER_PREFERENCES) summaries = run_structured_query(USER_PREFERENCES)
print(answer)
if ENABLE_SLACK: if ENABLE_SLACK:
slack.send_message(channel="#ragpull-demo", text=answer) blocks = create_slack_blocks(summaries, USER_PREFERENCES)
slack.send_message(channel="#ragpull-demo", blocks=blocks)
print(summaries)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,52 +1,81 @@
import logging import logging
import os import os
from langchain_core.documents import Document
from slack_sdk import WebClient from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError from slack_sdk.errors import SlackApiError
def prepare_message_blocks(stories: list[Document]) -> list: def format_story(story: dict) -> list:
blocks = [] title_text = (
for story in stories: f"<{story['source_url']}|{story['title']}>"
block = [ if story["source_url"]
else story["title"]
)
return [
{ {
"type": "header", "type": "section",
"text": {"type": "plain_text", "text": story.metadata["title"]}, "text": {
"type": "mrkdwn",
"text": f"*{title_text}*",
},
}, },
{ {
"type": "context", "type": "context",
"elements": [ "elements": [
{ {
"type": "plain_text", "type": "plain_text",
"text": f"Categories: {', '.join(story.metadata.get('categories', []))}", "text": f"Categories: {', '.join(story['categories'])}"
}, if story["categories"]
], else "No categories",
},
{
"type": "context",
"elements": [
{
"type": "plain_text",
"text": f"Posted on: {story.metadata['created_at']}",
} }
], ],
}, },
{"type": "section", "text": {"type": "mrkdwn", "text": story.page_content}}, {
"type": "section",
"text": {
"type": "mrkdwn",
"text": story["summary"],
},
},
] ]
blocks.append(block)
def format_slack_blocks(grouped_stories: dict[str, list[dict]]) -> list[dict]:
"""Format grouped stories into Slack block format."""
blocks = []
# Header block
blocks.append(
{"type": "header", "text": {"type": "plain_text", "text": "🚀 Tech Updates"}}
)
# Add stories for each group
for group_name, stories in grouped_stories.items():
# Group section header
section_title = (
"*Other Stories*" if group_name == "Other" else f"*{group_name}*"
)
blocks.append(
{"type": "section", "text": {"type": "mrkdwn", "text": section_title}}
)
for story in stories:
blocks.extend(format_story(story))
# Add divider after each group (except the last one)
blocks.append({"type": "divider"})
return blocks return blocks
def send_message(channel: str, text: str) -> None: def send_message(channel: str, blocks: list) -> None:
client = WebClient(token=os.environ["SLACK_BOT_TOKEN"]) client = WebClient(token=os.environ["SLACK_BOT_TOKEN"])
try: try:
response = client.chat_postMessage( response = client.chat_postMessage(
channel=channel, channel=channel,
username="HN Ragandy", text="Tech updates",
text=text, blocks=blocks,
unfurl_links=False, unfurl_links=False,
) )
response.validate() response.validate()