Build a Custom Web File Retriever: Step-by-Step Guide

Build a Custom Web File Retriever: Step-by-Step GuideA web file retriever is a tool or service that fetches files from remote web sources (HTTP/S, cloud storage APIs, FTP, etc.) and delivers them to a user, system, or pipeline. This guide walks you through building a simple, robust, and extensible web file retriever using widely available technologies. The implementation is language-agnostic in design, with concrete examples in Python. By the end you’ll have a retriever that can download files, validate and store them, handle retries and rate limits, and be extended to support authentication, parallelism, and storage backends.


Overview and design goals

Key goals:

  • Reliability: retries, resumable downloads, integrity checks.
  • Security: TLS, credential handling, safe temp storage.
  • Extensibility: plugins for protocols (S3, FTP, APIs).
  • Observability: logging, metrics, and error reporting.
  • Efficiency: parallel downloads, bandwidth control, caching.

High-level components:

  1. Fetcher — protocol-specific download logic.
  2. Validator — checks content-type, size, and integrity (checksum).
  3. Storage — local filesystem, object store, or CDN.
  4. Orchestrator — coordinates downloads, retries, backoff, concurrency.
  5. API/CLI — user interface to request retrieval and monitor status.

Technology choices (example stack)

  • Language: Python 3.10+ (alternatively Go or Node.js for performance).
  • HTTP client: requests or httpx (httpx supports async).
  • Async runtime: asyncio (for concurrent downloads).
  • Retries/backoff: tenacity or custom exponential backoff.
  • Storage: local disk for prototypes, AWS S3 / MinIO for production.
  • Integrity: SHA256 checksums.
  • Logging/metrics: structlog + Prometheus exporter.
  • Containerization: Docker.
  • CI: GitHub Actions.

Step 1 — Project layout

Example structure:

retriever/ ├─ retriever/ │  ├─ __init__.py │  ├─ orchestrator.py │  ├─ fetchers/ │  │  ├─ __init__.py │  │  ├─ http_fetcher.py │  │  └─ s3_fetcher.py │  ├─ validators.py │  ├─ storage.py │  └─ cli.py ├─ tests/ ├─ Dockerfile ├─ pyproject.toml └─ README.md 

Step 2 — Core fetcher (HTTP example)

Below is a minimal, production-minded HTTP fetcher using httpx, with streaming download, retries, and SHA256 calculation.

# retriever/fetchers/http_fetcher.py import hashlib import os import tempfile from typing import Tuple import httpx from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type class FetchError(Exception):     pass @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=1, max=10),        retry=retry_if_exception_type((httpx.HTTPError, FetchError))) def fetch_http(url: str, dest_dir: str = "/tmp") -> Tuple[str, str]:     """     Streams a file from `url` to a temp file in dest_dir.     Returns (filepath, sha256_hex).     """     headers = {"User-Agent": "custom-web-file-retriever/1.0"}     timeout = httpx.Timeout(30.0, connect=10.0)     with httpx.stream("GET", url, headers=headers, timeout=timeout, follow_redirects=True) as response:         if response.status_code != 200:             raise FetchError(f"Bad status: {response.status_code}")         sha256 = hashlib.sha256()         fd, tmp_path = tempfile.mkstemp(dir=dest_dir)         os.close(fd)         with open(tmp_path, "wb") as f:             for chunk in response.iter_bytes(chunk_size=1024*64):                 if not chunk:                     break                 f.write(chunk)                 sha256.update(chunk)     return tmp_path, sha256.hexdigest() 

Step 3 — Validators

Validate content-length, MIME type, max size, and checksum.

# retriever/validators.py import magic  # python-magic wrapper around libmagic import os from typing import Optional class ValidationError(Exception):     pass def validate_file(path: str, max_size_bytes: int = 200 * 1024 * 1024,                   allowed_mime_prefix: Optional[str] = None,                   expected_sha256: Optional[str] = None) -> None:     size = os.path.getsize(path)     if size > max_size_bytes:         raise ValidationError("File too large")     mime = magic.from_file(path, mime=True)     if allowed_mime_prefix and not mime.startswith(allowed_mime_prefix):         raise ValidationError(f"Unexpected MIME: {mime}")     if expected_sha256:         import hashlib         h = hashlib.sha256()         with open(path, "rb") as f:             for chunk in iter(lambda: f.read(65536), b""):                 h.update(chunk)         if h.hexdigest() != expected_sha256:             raise ValidationError("Checksum mismatch") 

Step 4 — Storage backends

Abstract storage so you can swap local disk, S3, or other.

# retriever/storage.py from abc import ABC, abstractmethod import shutil import os class StorageBackend(ABC):     @abstractmethod     def store(self, src_path: str, dest_key: str) -> str:         """Store file; return stored location/URL.""" class LocalStorage(StorageBackend):     def __init__(self, base_dir: str):         os.makedirs(base_dir, exist_ok=True)         self.base_dir = base_dir     def store(self, src_path: str, dest_key: str) -> str:         dst = os.path.join(self.base_dir, dest_key)         os.makedirs(os.path.dirname(dst), exist_ok=True)         shutil.move(src_path, dst)         return dst 

For S3 use boto3 to upload and return the S3 URL.


Step 5 — Orchestrator with concurrency and retries

Use asyncio to run multiple downloads concurrently and coordinate validator and storage.

# retriever/orchestrator.py import asyncio from concurrent.futures import ThreadPoolExecutor from typing import List from .fetchers.http_fetcher import fetch_http from .validators import validate_file from .storage import LocalStorage executor = ThreadPoolExecutor(max_workers=8) async def retrieve_urls(urls: List[str], dest_dir: str, storage: LocalStorage):     loop = asyncio.get_event_loop()     results = []     sem = asyncio.Semaphore(5)  # concurrent downloads     async def worker(url):         async with sem:             # run blocking fetch in threadpool             path, sha = await loop.run_in_executor(executor, fetch_http, url, dest_dir)             # validate (run in thread)             await loop.run_in_executor(executor, validate_file, path)             stored = await loop.run_in_executor(executor, storage.store, path, os.path.basename(path))             return {"url": url, "sha256": sha, "stored": stored}     tasks = [asyncio.create_task(worker(u)) for u in urls]     for t in asyncio.as_completed(tasks):         results.append(await t)     return results 

Step 6 — CLI and API

Provide a small CLI for single-shot retrievals and a simple HTTP API for programmatic usage.

Example CLI (click):

# retriever/cli.py import click import asyncio from .storage import LocalStorage from .orchestrator import retrieve_urls @click.command() @click.argument("urls", nargs=-1) @click.option("--out", default="/tmp/retriever") def main(urls, out):     storage = LocalStorage(out)     results = asyncio.run(retrieve_urls(list(urls), "/tmp", storage))     for r in results:         print(r) if __name__ == "__main__":     main() 

For an API use FastAPI with endpoints to submit retrieval jobs, check status, and download stored files.


Step 7 — Advanced features

  • Resumable downloads (HTTP Range): store progress and resume via Range requests.
  • Authentication: support OAuth2, API keys, signed URLs. Store credentials in a secrets manager.
  • Rate limiting & politeness: per-host concurrency and delays to avoid bans.
  • Deduplication & caching: use content-addressed storage (CAS) by SHA256.
  • Virus scanning: integrate ClamAV or a cloud malware scan.
  • Monitoring: Prometheus metrics for success/failure, latency, throughput; alerts for failure rates.
  • Work queue: Use Redis/RQ, Celery, or Kafka for distributed retrieval jobs.

Security and operational considerations

  • Always use HTTPS and verify certs.
  • Run fetchers in isolated workers or containers.
  • Limit disk usage and run periodic cleanup.
  • Rotate credentials and use least privilege for storage.
  • Sanitize filenames and avoid path traversal when writing to disk.
  • Rate-limit retries to avoid DOSing remote hosts.

Testing and CI

  • Unit test fetchers with recorded HTTP responses (VCR.py or responses).
  • Integration tests against local S3 (MinIO) and test web servers.
  • Fuzz large and malformed responses to ensure robustness.
  • Add linting, type checking (mypy), and security scans.

Deployment

  • Containerize with Docker; use environment variables for configuration.
  • Use Kubernetes for scaling; each worker handles job from a queue.
  • Use autoscaling policies based on queue depth and network throughput.

Example usage scenarios

  • Ingesting daily data dumps from partner websites.
  • Fetching user-submitted file URLs for processing (images, logs).
  • Migrating files from legacy FTP servers to cloud storage.
  • On-demand retrieval for previewing external assets.

Conclusion

This step-by-step guide gives a practical blueprint to build a custom web file retriever that balances reliability, security, and extensibility. Start with the simple HTTP fetcher and local storage, then add validators, authentication, resumable transfers, and production-grade orchestration as your needs grow.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *