From 547da4517ad540953445d36dae008ad8dee1f193 Mon Sep 17 00:00:00 2001 From: Adrian Rumpold Date: Tue, 1 Jul 2025 14:16:00 +0200 Subject: [PATCH] Per-article summarization and preference matching --- indexing.py | 153 +++++++++++++++++++++++++++++++++++++++++----------- slack.py | 91 ++++++++++++++++++++----------- 2 files changed, 181 insertions(+), 63 deletions(-) diff --git a/indexing.py b/indexing.py index 7929362..9fb2d4e 100644 --- a/indexing.py +++ b/indexing.py @@ -19,7 +19,7 @@ from scrape import JinaScraper NUM_STORIES = 20 USER_PREFERENCES = ["Machine Learning", "Linux", "Open-Source"] -ENABLE_SLACK = False # Send updates to Slack, need to set SLACK_BOT_TOKEN env var +ENABLE_SLACK = True # Send updates to Slack, need to set SLACK_BOT_TOKEN env var ENABLE_MLFLOW_TRACING = False # Use MLflow (at http://localhost:5000) for tracing @@ -41,12 +41,13 @@ class State(TypedDict): preferences: Iterable[str] context: list[langchain_core.documents.Document] answer: str + summaries: list[dict] -def retrieve(state: State): +def retrieve(state: State, top_n: int = 5) -> State: # Search for relevant documents retrieved_docs = vector_store.similarity_search( - "Categories: " + ", ".join(state["preferences"]), k=10 + "Categories: " + ", ".join(state["preferences"]), k=20 ) # If you're using chunks, group them back into complete stories @@ -76,39 +77,126 @@ def retrieve(state: State): ) complete_stories.append(complete_story) - return {"context": complete_stories[:5]} # Limit to top 5 stories + return {"context": complete_stories[:top_n]} -def generate(state: State): - docs_content = "\n\n".join(doc.page_content for doc in state["context"]) +def generate_structured_summaries(state: State): + """Generate structured summaries for each story individually.""" + summaries = [] - prompt = langchain.prompts.PromptTemplate( - input_variables=["preferences", "context"], - template=( - "You are a helpful assistant that can provide updates on technology topics based on the topics a user has expressed interest in and additional context.\n\n" - "Please respond in Markdown format and group your answers based on the categories of the items in the context.\n" - "If applicable, add hyperlinks to the original source as part of the headline for each story.\n" - "Limit your summaries to approximately 100 words per item.\n\n" - "Preferences: {preferences}\n\n" - "Context:\n{context}\n\n" - "Answer:" - ), + for doc in state["context"]: + # Create a prompt for individual story summarization + prompt = langchain.prompts.PromptTemplate( + input_variables=["preferences", "title", "content", "source", "categories"], + template=( + "You are a helpful assistant that summarizes technology articles.\n\n" + "User preferences: {preferences}\n\n" + "Article title: {title}\n" + "Article categories: {categories}\n" + "Article content: {content}\n" + "Source URL: {source}\n\n" + "Use an informative but not too formal tone.\n" + "Please provide:\n" + "1. A concise summary (around 50 words) that highlights the key insights from the article.\n" + "2. The single user preference that this article best matches (or 'Other' if none match well)\n\n" + "Format your response as:\n" + "PREFERENCE: [preference name or 'Other']\n" + "SUMMARY: [your summary here]\n" + ), + ) + + messages = prompt.invoke( + { + "preferences": ", ".join(state["preferences"]), + "title": doc.metadata.get("title", "Unknown Title"), + "content": doc.page_content[:5000], # Limit content length for LLM + "source": doc.metadata.get("source", ""), + "categories": ", ".join(doc.metadata.get("categories", [])), + } + ) + + response = llm.invoke(messages).content + + # Parse the LLM response to extract preference and summary + response_text = response if isinstance(response, str) else str(response) + lines = response_text.strip().split("\n") + matched_preference = "Other" + summary_text = response_text + + for line in lines: + if line.startswith("PREFERENCE:"): + matched_preference = line.replace("PREFERENCE:", "").strip() + elif line.startswith("SUMMARY:"): + summary_text = line.replace("SUMMARY:", "").strip() + + # If we didn't find the structured format, use the whole response as summary + if not any(line.startswith("SUMMARY:") for line in lines): + summary_text = response_text.strip() + + summaries.append( + { + "title": doc.metadata.get("title", "Unknown Title"), + "summary": summary_text, + "source_url": doc.metadata.get("source", ""), + "categories": doc.metadata.get("categories", []), + "story_id": doc.metadata.get("story_id"), + "matched_preference": matched_preference, + } + ) + + return {"summaries": summaries} + + +def group_stories_by_preference( + summaries: list[dict], preferences: list[str] +) -> dict[str, list[dict]]: + """Group stories by their matched preferences in the order of user preferences.""" + preference_groups = {} + + # Group stories by the LLM-determined preference matching + for summary in summaries: + matched_preference = summary.get("matched_preference", "Other") + + if matched_preference not in preference_groups: + preference_groups[matched_preference] = [] + preference_groups[matched_preference].append(summary) + + # Create ordered groups based on user preferences + ordered_groups = {} + + # Add groups for user preferences in order + for preference in preferences: + if preference in preference_groups: + ordered_groups[preference] = preference_groups[preference] + + # Add "Other" group at the end if it exists + if "Other" in preference_groups: + ordered_groups["Other"] = preference_groups["Other"] + + return ordered_groups + + +def create_slack_blocks(summaries: list[dict], preferences: list[str]) -> list[dict]: + """Convert structured summaries into Slack block format grouped by user preferences.""" + grouped_stories = group_stories_by_preference(summaries, preferences) + return slack.format_slack_blocks(grouped_stories) + + +def run_structured_query( + preferences: Iterable[str], +) -> list[dict]: + """Run query and return structured summary data.""" + graph_builder = langgraph.graph.StateGraph(State).add_sequence( + [retrieve, generate_structured_summaries] ) - - messages = prompt.invoke( - {"preferences": state["preferences"], "context": docs_content} - ) - response = llm.invoke(messages) - return {"answer": response.content} - - -def run_query(preferences: Iterable[str]) -> str: - graph_builder = langgraph.graph.StateGraph(State).add_sequence([retrieve, generate]) graph_builder.add_edge(langgraph.graph.START, "retrieve") graph = graph_builder.compile() - response = graph.invoke(State(preferences=preferences, context=[], answer="")) - return response["answer"] + response = graph.invoke( + State(preferences=preferences, context=[], answer="", summaries=[]) + ) + summaries = response["summaries"] + return summaries def get_existing_story_ids() -> set[str]: @@ -235,10 +323,11 @@ async def main(): print("No new stories to process") # 4. Query - answer = run_query(USER_PREFERENCES) - print(answer) + summaries = run_structured_query(USER_PREFERENCES) if ENABLE_SLACK: - slack.send_message(channel="#ragpull-demo", text=answer) + blocks = create_slack_blocks(summaries, USER_PREFERENCES) + slack.send_message(channel="#ragpull-demo", blocks=blocks) + print(summaries) if __name__ == "__main__": diff --git a/slack.py b/slack.py index 672bde0..699ee18 100644 --- a/slack.py +++ b/slack.py @@ -1,52 +1,81 @@ import logging import os -from langchain_core.documents import Document from slack_sdk import WebClient from slack_sdk.errors import SlackApiError -def prepare_message_blocks(stories: list[Document]) -> list: - blocks = [] - for story in stories: - block = [ - { - "type": "header", - "text": {"type": "plain_text", "text": story.metadata["title"]}, +def format_story(story: dict) -> list: + title_text = ( + f"<{story['source_url']}|{story['title']}>" + if story["source_url"] + else story["title"] + ) + return [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": f"*{title_text}*", }, - { - "type": "context", - "elements": [ - { - "type": "plain_text", - "text": f"Categories: {', '.join(story.metadata.get('categories', []))}", - }, - ], + }, + { + "type": "context", + "elements": [ + { + "type": "plain_text", + "text": f"Categories: {', '.join(story['categories'])}" + if story["categories"] + else "No categories", + } + ], + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": story["summary"], }, - { - "type": "context", - "elements": [ - { - "type": "plain_text", - "text": f"Posted on: {story.metadata['created_at']}", - } - ], - }, - {"type": "section", "text": {"type": "mrkdwn", "text": story.page_content}}, - ] + }, + ] + + +def format_slack_blocks(grouped_stories: dict[str, list[dict]]) -> list[dict]: + """Format grouped stories into Slack block format.""" + blocks = [] + + # Header block + blocks.append( + {"type": "header", "text": {"type": "plain_text", "text": "🚀 Tech Updates"}} + ) + + # Add stories for each group + for group_name, stories in grouped_stories.items(): + # Group section header + section_title = ( + "*Other Stories*" if group_name == "Other" else f"*{group_name}*" + ) + blocks.append( + {"type": "section", "text": {"type": "mrkdwn", "text": section_title}} + ) + + for story in stories: + blocks.extend(format_story(story)) + + # Add divider after each group (except the last one) + blocks.append({"type": "divider"}) - blocks.append(block) return blocks -def send_message(channel: str, text: str) -> None: +def send_message(channel: str, blocks: list) -> None: client = WebClient(token=os.environ["SLACK_BOT_TOKEN"]) try: response = client.chat_postMessage( channel=channel, - username="HN Ragandy", - text=text, + text="Tech updates", + blocks=blocks, unfurl_links=False, ) response.validate()