import asyncio import logging import os from typing import Iterable, TypedDict import langchain import langchain.chat_models import langchain.prompts import langchain.retrievers import langchain.text_splitter import langchain_core import langchain_core.documents import langchain_openai import langchain_weaviate import langgraph.graph import slack import weaviate from hn import HackerNewsClient, Story from scrape import JinaScraper from util import DocumentLocalFileStore NUM_STORIES = 40 # Number of top stories to fetch from Hacker News USER_PREFERENCES = ["Machine Learning", "Programming", "DevOps"] ENABLE_SLACK = True # Send updates to Slack, need to set SLACK_BOT_TOKEN env var ENABLE_MLFLOW_TRACING = True # Use MLflow (at http://localhost:5000) for tracing llm = langchain.chat_models.init_chat_model( model="gpt-4o-mini", model_provider="openai" ) embeddings = langchain_openai.OpenAIEmbeddings(model="text-embedding-3-large") weaviate_client = weaviate.connect_to_local() vector_store = langchain_weaviate.WeaviateVectorStore( weaviate_client, index_name="hn_stories", text_key="page_content", embedding=embeddings, ) splitter = langchain.text_splitter.RecursiveCharacterTextSplitter( chunk_size=2000, chunk_overlap=200, ) doc_store = DocumentLocalFileStore(root_path="data/documents") retriever = langchain.retrievers.ParentDocumentRetriever( vectorstore=vector_store, docstore=doc_store, child_splitter=splitter, ) class State(TypedDict): preferences: Iterable[str] context: list[langchain_core.documents.Document] answer: str summaries: list[dict] def retrieve_docs(state: State, top_n: int = 3): """Retrieve relevant documents based on user preferences.""" docs = [] for preference in state["preferences"]: logging.info(f"Retrieving documents for preference: {preference}") docs.extend(retriever.invoke(preference)[:top_n]) return {"context": docs} def generate_structured_summaries(state: State): """Generate structured summaries for each story individually.""" summaries = [] for doc in state["context"]: # Create a prompt for individual story summarization prompt = langchain.prompts.PromptTemplate( input_variables=["preferences", "title", "content", "source", "categories"], template=( "You are a helpful assistant that summarizes technology articles.\n\n" "User preferences: {preferences}\n\n" "Article title: {title}\n" "Article categories: {categories}\n" "Article content: {content}\n" "Source URL: {source}\n\n" "Use an informative but not too formal tone.\n" "Please provide:\n" "1. A concise summary (around 50 words) that highlights the key insights from the article.\n" "2. The single user preference that this article best matches (or 'Other' if none match well)\n\n" "Format your response as:\n" "PREFERENCE: [preference name or 'Other']\n" "SUMMARY: [your summary here]\n" ), ) messages = prompt.invoke( { "preferences": ", ".join(state["preferences"]), "title": doc.metadata.get("title", "Unknown Title"), "content": doc.page_content[:5000], # Limit content length for LLM "source": doc.metadata.get("source", ""), "categories": ", ".join(doc.metadata.get("categories", [])), } ) response = llm.invoke(messages).content # Parse the LLM response to extract preference and summary response_text = response if isinstance(response, str) else str(response) lines = response_text.strip().split("\n") matched_preference = "Other" summary_text = response_text for line in lines: if line.startswith("PREFERENCE:"): matched_preference = line.replace("PREFERENCE:", "").strip() elif line.startswith("SUMMARY:"): summary_text = line.replace("SUMMARY:", "").strip() # If we didn't find the structured format, use the whole response as summary if not any(line.startswith("SUMMARY:") for line in lines): summary_text = response_text.strip() summaries.append( { "title": doc.metadata.get("title", "Unknown Title"), "summary": summary_text, "source_url": doc.metadata.get("source", ""), "categories": doc.metadata.get("categories", []), "story_id": doc.metadata.get("story_id"), "matched_preference": matched_preference, } ) return {"summaries": summaries} def group_stories_by_preference( summaries: list[dict], preferences: list[str] ) -> dict[str, list[dict]]: """Group stories by their matched preferences in the order of user preferences.""" preference_groups = {} # Group stories by the LLM-determined preference matching for summary in summaries: matched_preference = summary.get("matched_preference", "Other") if matched_preference not in preference_groups: preference_groups[matched_preference] = [] preference_groups[matched_preference].append(summary) # Create ordered groups based on user preferences ordered_groups = {} # Add groups for user preferences in order for preference in preferences: if preference in preference_groups: ordered_groups[preference] = preference_groups[preference] # Add "Other" group at the end if it exists if "Other" in preference_groups: ordered_groups["Other"] = preference_groups["Other"] return ordered_groups def create_slack_blocks(summaries: list[dict], preferences: list[str]) -> list[dict]: """Convert structured summaries into Slack block format grouped by user preferences.""" grouped_stories = group_stories_by_preference(summaries, preferences) return slack.format_slack_blocks(grouped_stories) def run_structured_query( preferences: Iterable[str], ) -> list[dict]: """Run query and return structured summary data.""" graph_builder = langgraph.graph.StateGraph(State).add_sequence( [retrieve_docs, generate_structured_summaries] ) graph_builder.add_edge(langgraph.graph.START, "retrieve_docs") graph = graph_builder.compile() response = graph.invoke( State(preferences=preferences, context=[], answer="", summaries=[]) ) summaries = response["summaries"] return summaries def get_existing_story_ids() -> set[str]: """Get the IDs of stories that already exist in the vector store.""" try: collection = vector_store._collection existing_ids = set() for doc in collection.iterator(): story_id = doc.properties.get("story_id") if story_id: existing_ids.add(story_id) return existing_ids except Exception: logging.warning("Could not retrieve existing story IDs", exc_info=True) return set() async def fetch_hn_top_stories( limit: int = 10, force_fetch: bool = False, ) -> list[langchain_core.documents.Document]: hn = HackerNewsClient() stories = await hn.get_top_stories(limit=limit) # Get existing story IDs to avoid re-fetching existing_ids = get_existing_story_ids() logging.info(f"Existing story IDs: {len(existing_ids)} found in vector store") new_stories = [story for story in stories if story.id not in existing_ids] logging.info(f"Found {len(stories)} top stories, {len(new_stories)} are new") if not new_stories: if not force_fetch: logging.info("No new stories to fetch") return [] else: logging.info("Force fetching all top stories regardless of existing IDs") new_stories = stories contents = {} # Fetch content for each new story asynchronously scraper = JinaScraper(os.getenv("JINA_API_KEY")) async def _fetch_content(story: Story) -> tuple[str, str]: if not story.url: return story.id, story.title return story.id, await scraper.get_content(story.url) tasks = [_fetch_content(story) for story in new_stories] results = await asyncio.gather(*tasks) # Filter out exceptions and convert to dict contents = {} for result in results: story_id, content = result contents[story_id] = content documents = [ langchain_core.documents.Document( page_content=contents[story.id], metadata={ "story_id": story.id, "title": story.title, "source": story.url, "created_at": story.created_at.isoformat(), }, ) for story in new_stories ] return documents async def main(): if ENABLE_MLFLOW_TRACING: import mlflow mlflow.set_tracking_uri("http://localhost:5000") mlflow.set_experiment("langchain-rag-hn") mlflow.langchain.autolog() # 1. Load only new stories new_stories = await fetch_hn_top_stories(limit=NUM_STORIES) if new_stories: print(f"Processing {len(new_stories)} new stories") # Categorize stories (optional) from classify import categorize for story in new_stories: categories = categorize(story, llm) story.metadata["categories"] = list(categories) print( f"Story ID {story.metadata["story_id"]} ({story.metadata["title"]}) categorized as: {categories}" ) # 2. Split & 3. Store retriever.add_documents( new_stories, ids=[doc.metadata["story_id"] for doc in new_stories] ) print(f"Added {len(new_stories)} documents to document store") else: print("No new stories to process") # 4. Query summaries = run_structured_query(USER_PREFERENCES) if ENABLE_SLACK: blocks = create_slack_blocks(summaries, USER_PREFERENCES) slack.send_message(channel="#ragpull-demo", blocks=blocks) print(summaries) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) try: asyncio.run(main()) finally: weaviate_client.close()