Working initial version
This commit is contained in:
232
src/formex_viewer/main.py
Normal file
232
src/formex_viewer/main.py
Normal file
@@ -0,0 +1,232 @@
|
||||
import io
|
||||
import logging
|
||||
import typing
|
||||
import zipfile
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
from lxml import etree
|
||||
|
||||
from formex_viewer import formex4
|
||||
|
||||
|
||||
class SystemName(StrEnum):
|
||||
CELEX = "celex"
|
||||
CELLAR = "cellar"
|
||||
OJ = "oj"
|
||||
|
||||
|
||||
@dataclass
|
||||
class CellarIdentifier:
|
||||
system_name: SystemName
|
||||
system_id: str
|
||||
|
||||
|
||||
class Language(StrEnum):
|
||||
"""Language enum for Cellar"""
|
||||
|
||||
ENG = "eng"
|
||||
DEU = "deu"
|
||||
FRA = "fra"
|
||||
ITA = "ita"
|
||||
|
||||
|
||||
class ContentType(StrEnum):
|
||||
XML_FMX4 = "application/xml;mtype=fmx4"
|
||||
ZIP_FMX4 = "application/zip;mtype=fmx4"
|
||||
XHTML_XML = "application/xhtml+xml"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Metadata:
|
||||
publication_text: CellarIdentifier
|
||||
|
||||
@classmethod
|
||||
def from_xml(cls, xmlstr: str):
|
||||
"""Parse XML metadata"""
|
||||
|
||||
tree = etree.fromstring(xmlstr.encode("utf-8"))
|
||||
url = tree.xpath("//NOTICE/EXPRESSION/URI")[0]
|
||||
|
||||
return cls(
|
||||
publication_text=CellarIdentifier(
|
||||
system_name=SystemName(url.find("TYPE").text),
|
||||
system_id=url.find("IDENTIFIER").text,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def extract_fmx_main_publication(zip_data: typing.BinaryIO) -> str:
|
||||
# Find the main document in a Formex 4 ZIP archive
|
||||
#
|
||||
# Algorithm:
|
||||
# 1. Find the XML file containing the document descriptor (*.doc.fmx.xml)
|
||||
# 2. Parse the XML file to find the main publication (XPath `/DOC/FMX/DOC.MAIN.PUB/REF.PHYS`)
|
||||
# 3. Extract the file from the ZIP archive using the `FILE` attribute value
|
||||
|
||||
with zipfile.ZipFile(zip_data) as z:
|
||||
# Find the document descriptor XML file
|
||||
doc_xml_files = [
|
||||
f for f in z.namelist() if f.endswith((".doc.fmx.xml", ".doc.xml"))
|
||||
]
|
||||
if not doc_xml_files:
|
||||
logging.info("ZIP file contents: %s", z.namelist())
|
||||
raise ValueError(
|
||||
"No document descriptor (*.doc.xml / *.doc.fmx.xml) found in the archive"
|
||||
)
|
||||
|
||||
doc_xml_file = doc_xml_files[0]
|
||||
|
||||
# Parse the XML file
|
||||
with z.open(doc_xml_file) as f:
|
||||
tree = etree.parse(f)
|
||||
|
||||
# Find the main publication reference
|
||||
main_pub_ref = tree.xpath('/DOC/FMX/DOC.MAIN.PUB/REF.PHYS[@TYPE="DOC.XML"]')
|
||||
if not main_pub_ref:
|
||||
raise ValueError(
|
||||
"Main publication reference not found in document descriptor"
|
||||
)
|
||||
|
||||
# Get the FILE attribute
|
||||
main_file = main_pub_ref[0].get("FILE")
|
||||
if not main_file:
|
||||
raise ValueError("FILE attribute not found in main publication reference")
|
||||
|
||||
# Extract the file content
|
||||
with z.open(main_file) as f:
|
||||
return f.read().decode("utf-8")
|
||||
|
||||
|
||||
class CellarClient:
|
||||
def __init__(self, language: Language = Language.ENG):
|
||||
self._client = httpx.Client(
|
||||
base_url="http://publications.europa.eu", follow_redirects=True
|
||||
)
|
||||
self.language = language
|
||||
|
||||
def metadata(self, cellar_id: CellarIdentifier):
|
||||
"""Fetch metadata from Cellar"""
|
||||
|
||||
resp = self._client.get(
|
||||
f"/resource/{cellar_id.system_name}/{cellar_id.system_id}",
|
||||
headers={
|
||||
"Accept": "application/xml;notice=object",
|
||||
"Accept-Language": self.language,
|
||||
},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return Metadata.from_xml(resp.text)
|
||||
|
||||
def publication_text(
|
||||
self,
|
||||
cellar_id: CellarIdentifier,
|
||||
content_type: ContentType,
|
||||
) -> str:
|
||||
"""Fetch a publication from Cellar"""
|
||||
|
||||
metadata = self.metadata(cellar_id)
|
||||
identifier = metadata.publication_text
|
||||
|
||||
resp = self._client.get(
|
||||
f"/resource/{identifier.system_name}/{identifier.system_id}",
|
||||
headers={
|
||||
"Accept": content_type,
|
||||
"Accept-Language": self.language,
|
||||
},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
|
||||
if "zip" in resp.headers.get("Content-Type", ""):
|
||||
return extract_fmx_main_publication(io.BytesIO(resp.content))
|
||||
else:
|
||||
return resp.text
|
||||
|
||||
|
||||
def _get_path(
|
||||
base_path: Path,
|
||||
typ: typing.Literal["recital", "article", "annex"],
|
||||
num: str,
|
||||
extension: str,
|
||||
) -> Path:
|
||||
"""Get the path for a given type and number"""
|
||||
|
||||
plurals = {
|
||||
"recital": "recitals",
|
||||
"article": "articles",
|
||||
"annex": "annexes",
|
||||
}
|
||||
|
||||
return (base_path / plurals[typ] / f"{num}").with_suffix(extension)
|
||||
|
||||
|
||||
class HtmlConverter:
|
||||
"""Convert a Formex 4 XML document to HTML files"""
|
||||
|
||||
extension = ".html"
|
||||
|
||||
def __init__(self, fmx4_content: str, outdir: Path):
|
||||
self._fmx4 = fmx4_content
|
||||
self._xml = etree.fromstring(self._fmx4.encode("utf-8"))
|
||||
self._outdir = outdir
|
||||
self._outdir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def convert(self) -> str:
|
||||
"""Split the publication text into separate files.
|
||||
|
||||
Structure:
|
||||
- recitals/: All recitals, one per file
|
||||
- articles/: All articles, one per file
|
||||
- annexes/: All annexes, one per file
|
||||
"""
|
||||
|
||||
# Extract recitals
|
||||
recital_xpath = "//GR.CONSID/CONSID/NP"
|
||||
recitals = self._xml.xpath(recital_xpath)
|
||||
for recital in recitals:
|
||||
num_str = recital.find("NO.P").text
|
||||
num = num_str.strip("()")
|
||||
|
||||
filename = _get_path(self._outdir, "recital", num, self.extension)
|
||||
filename.parent.mkdir(parents=True, exist_ok=True)
|
||||
txt = formex4.text_content(recital.find("TXT"))
|
||||
if txt is None:
|
||||
logging.warning("Recital %s has no text", num)
|
||||
continue
|
||||
filename.write_text(txt, encoding="utf-8")
|
||||
|
||||
# Extract articles
|
||||
# Extract recitals
|
||||
article_xpath = "//ARTICLE"
|
||||
articles = self._xml.xpath(article_xpath)
|
||||
for article in articles:
|
||||
num = article.get("IDENTIFIER").lstrip("0")
|
||||
|
||||
filename = _get_path(self._outdir, "article", num, self.extension)
|
||||
filename.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
txt = formex4.FormexArticleConverter().convert_article(article)
|
||||
filename.write_text(txt, encoding="utf-8")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
# Cyber Resilience Act - CELEX 32024R2847
|
||||
cra_id = CellarIdentifier(system_name=SystemName.CELEX, system_id="32024R2847")
|
||||
|
||||
# AI Act - CELEX 32024R1689
|
||||
aia_id = CellarIdentifier(system_name=SystemName.CELEX, system_id="32024R1689")
|
||||
|
||||
content_type = ContentType.ZIP_FMX4
|
||||
language = Language.ENG
|
||||
|
||||
client = CellarClient(language=language)
|
||||
|
||||
tmpdir = Path("tmp")
|
||||
tmpdir.mkdir(parents=True, exist_ok=True)
|
||||
fmx4_text = client.publication_text(cra_id, content_type, language)
|
||||
converter = HtmlConverter(fmx4_text, Path(tmpdir))
|
||||
converter.convert()
|
||||
Reference in New Issue
Block a user