Triggering GCP Cloud Functions on New Shapefile Uploads
To reliably process spatial data in Google Cloud, configure a Cloud Functions 2nd gen deployment with a google.cloud.storage.object.v1.finalized event filter scoped strictly to .shp files. Because shapefiles are inherently multi-file bundles (.shp, .shx, .dbf, plus optional .prj, .cpg), the trigger must fire on the primary geometry file, programmatically verify companion file existence within the same directory prefix, and enforce idempotent state checks before invoking geospatial libraries like GDAL or Fiona. This architecture prevents partial dataset ingestion, aligns with Event-Driven Geospatial Processing Patterns where file-type routing and state validation are mandatory, and scales cleanly across high-throughput spatial ETL pipelines.
1. Event Routing & Bundle Validation Logic
Google Cloud Storage emits object lifecycle events per individual file, not per logical dataset. Triggering on .shx or .dbf uploads first creates race conditions where the geometry file hasn’t arrived yet. By scoping the trigger to .shp uploads, you establish a deterministic entry point. The function then:
- Extracts the base filename and directory prefix from the event payload.
- Lists the bucket prefix to confirm all required extensions (
.shp,.shx,.dbf) exist. - Skips execution if any required companion is missing, allowing the final upload to trigger a fresh, complete run.
This prefix-scanning approach avoids complex metadata tagging while remaining fully compatible with GCS’s eventual consistency model. For teams evaluating multi-cloud ingestion strategies, the same validation logic applies when mapping S3 and GCS Event Triggers for Shapefiles to unified data pipelines.
2. Deployment & IAM Configuration
Deploy the function using the gcloud CLI with Eventarc-backed 2nd gen parameters. The --trigger-event-filters flag replaces legacy Pub/Sub topic routing and reduces latency.
gcloud functions deploy process-shapefile \
--gen2 \
--runtime=python311 \
--region=us-central1 \
--source=. \
--entry-point=process_shapefile \
--trigger-event-filters="bucket=YOUR_GCS_BUCKET,type=google.cloud.storage.object.v1.finalized" \
--trigger-location=us-central1 \
--memory=1Gi \
--timeout=540s \
--service-account=YOUR_CF_SA@PROJECT_ID.iam.gserviceaccount.com
IAM Scoping: The Cloud Functions runtime service account requires only roles/storage.objectViewer and roles/storage.objectCreator on the target bucket. Avoid roles/storage.admin in production. If the function writes to a separate staging bucket, grant objectCreator on that destination only. For detailed event routing architecture, review the official Google Cloud Storage Pub/Sub notifications documentation.
3. Production-Ready Python Handler
The following handler validates the multipart structure, downloads to an ephemeral filesystem, runs a lightweight Fiona validation, and enforces idempotency via a .processed marker.
import os
import tempfile
import logging
from pathlib import Path
from google.cloud import storage
from google.cloud.functions import CloudEvent
import fiona
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
storage_client = storage.Client()
REQUIRED_EXTS = {".shp", ".shx", ".dbf"}
OPTIONAL_EXTS = {".prj", ".cpg", ".sbn", ".sbx"}
PROCESSED_MARKER = ".processed"
def process_shapefile(event: CloudEvent) -> None:
data = event.data
bucket_name = data["bucket"]
object_name = data["name"]
# 1. Filter trigger to primary geometry file
if not object_name.lower().endswith(".shp"):
logger.info(f"Ignoring non-.shp upload: {object_name}")
return
bucket = storage_client.bucket(bucket_name)
base_name = Path(object_name).stem
dir_path = str(Path(object_name).parent)
marker_path = f"{dir_path}/{base_name}{PROCESSED_MARKER}"
# 2. Idempotency check
if bucket.blob(marker_path).exists():
logger.info(f"Skipping already processed bundle: {base_name}")
return
# 3. Verify required companions exist
required_paths = [f"{dir_path}/{base_name}{ext}" for ext in REQUIRED_EXTS]
missing = [p for p in required_paths if not bucket.blob(p).exists()]
if missing:
logger.warning(f"Missing required files for {base_name}: {missing}")
return
# 4. Download bundle to ephemeral storage
with tempfile.TemporaryDirectory() as tmp_dir:
local_paths = []
for blob_name in required_paths:
blob = bucket.blob(blob_name)
local_path = os.path.join(tmp_dir, os.path.basename(blob_name))
blob.download_to_filename(local_path)
local_paths.append(local_path)
# 5. Validate geometry & attributes with Fiona
shp_path = os.path.join(tmp_dir, f"{base_name}.shp")
try:
with fiona.open(shp_path, "r") as src:
feature_count = len(src)
crs = src.crs
logger.info(f"Validated {base_name}: {feature_count} features, CRS={crs}")
except fiona.errors.DriverError as e:
logger.error(f"GDAL/Fiona validation failed for {base_name}: {e}")
raise
# 6. Mark as processed & trigger downstream routing
bucket.blob(marker_path).upload_from_string("processed")
logger.info(f"Successfully processed {base_name}. Ready for downstream routing.")
4. Idempotency & Retry Strategy
Cloud Functions 2nd gen guarantees at-least-once delivery. Network blips, GCS eventual consistency, or transient GDAL errors can cause duplicate invocations. The .processed marker file acts as a lightweight distributed lock. For production systems, consider replacing the marker with GCS object metadata (metadata={"status": "processed"}) to avoid cluttering the bucket with auxiliary files.
Additionally, wrap the Fiona validation in a retry decorator with exponential backoff. Geospatial libraries occasionally fail on corrupted headers or missing projection files. The official Cloud Functions Python runtime documentation outlines built-in retry configuration via --retry during deployment or via the functions-framework configuration.
5. Cross-Cloud & Pipeline Integration
Once validated, the function should route data to downstream systems:
- BigQuery: Use
pandas+geopandasto convert to GeoJSON/Parquet, then load viagoogle-cloud-bigquery. - Cloud SQL/PostGIS: Stream validated features using
psycopg2withST_GeomFromGeoJSON. - Data Catalog: Register the dataset schema for governance and lineage tracking.
Because shapefile validation logic is cloud-agnostic, this pattern scales seamlessly when replicating ingestion across AWS S3 or Azure Blob Storage. Maintain consistent prefix conventions, enforce strict IAM boundaries, and centralize retry/idempotency logic to ensure reliable spatial ETL at scale.