import collections
from datetime import datetime
import logging
import numpy
import os
import pandas
from pandas.errors import EmptyDataError
import string
import time
import re
import warnings
from .numerical import mean_stddev, get_numerical_ranges
from .profile_types import identify_types, determine_dataset_type
from .spatial import (
LatLongColumn,
Geohasher,
nominatim_resolve_all,
pair_latlong_columns,
get_spatial_ranges,
parse_wkt_column,
GeoClassifier,
HybridGeoClassifier,
)
from .temporal import get_temporal_resolution
from . import types
logger = logging.getLogger(__name__)
# Mapping from GeoClassifier labels to (structural_type, [semantic_types])
# Only spatial types are included - "non_spatial" falls through to regular workflow
GEO_CLASSIFIER_SPATIAL_MAP = {
# Lat/Long coordinates
"latitude": (types.FLOAT, [types.LATITUDE]),
"longitude": (types.FLOAT, [types.LONGITUDE]),
# Projected coordinates (e.g., State Plane, UTM)
"x_coord": (types.FLOAT, []),
"y_coord": (types.FLOAT, []),
# WKT geometry types
"point": (types.GEO_POINT, []),
"polygon": (types.GEO_POLYGON, []),
"multi-polygon": (types.GEO_POLYGON, []),
"line": (types.GEO_POLYGON, []),
"multi-line": (types.GEO_POLYGON, []),
# Address-related
"zip5": (types.TEXT, [types.ADDRESS]),
"zip9": (types.TEXT, [types.ADDRESS]),
"address": (types.TEXT, [types.ADDRESS]),
# Administrative areas
"borough": (
types.TEXT,
[types.ADMIN],
), # Named boroughs (e.g., "Brooklyn", "Queens")
"borough_code": (types.TEXT, [types.ADMIN]), # Borough codes (numeric/alphanumeric)
"city": (types.TEXT, [types.ADMIN]),
"state": (types.TEXT, [types.ADMIN]),
"state_code": (types.TEXT, [types.ADMIN]),
"country": (types.TEXT, [types.ADMIN]),
# NYC-specific identifiers (spatial context)
"bbl": (types.INTEGER, [types.ID]),
"bin": (types.INTEGER, [types.ID]),
}
RANDOM_SEED = 89
MAX_SIZE = 5000000 # 5 MB
SAMPLE_ROWS = 20
MAX_UNCLEAN_ADDRESSES = 0.20 # 20%
MAX_SKIPPED_ROWS = 6
"""Maximum number of rows to discard at the top of the file"""
HEADER_CONSISTENT_ROWS = 4
"""Stop throwing out lines when that many in a row have same number of columns
"""
MAX_GEOHASHES = 100
_re_word_split = re.compile(r"\W+")
def truncate_string(s, limit=140):
"""Truncate a string, replacing characters over the limit with "..."."""
if len(s) <= limit:
return s
else:
# Try to find a space
space = s.rfind(" ", limit - 20, limit - 3)
if space == -1:
return s[: limit - 3] + "..."
else:
return s[:space] + "..."
DELIMITERS = set(string.punctuation) | set(string.whitespace)
UPPER = set(string.ascii_uppercase)
LOWER = set(string.ascii_lowercase)
def expand_attribute_name(name):
"""Expand an attribute names to keywords derived from it."""
name = name.replace("_", " ").replace("-", " ")
word = []
for c in name:
if c in DELIMITERS:
if word:
yield "".join(word)
word = []
continue
if word:
if (word[-1] in string.digits) != (c in string.digits) or (
word[-1] in LOWER and c in UPPER
):
yield "".join(word)
word = []
word.append(c)
yield "".join(word)
# Simple no-op context manager for compatibility
class NoOpContext:
def __enter__(self):
return self
def __exit__(self, *args):
return False
def load_data(data, load_max_size=None, indexes=True):
"""Load data from file path, file object, or DataFrame.
Returns sampled data for profiling along with full-data statistics.
"""
if load_max_size is None:
load_max_size = MAX_SIZE
metadata = {}
full_data_stats = {}
# Step 1: Convert file path/file object to DataFrame
if isinstance(data, (str, bytes)):
path = str(data)
if not os.path.exists(path):
raise ValueError("data file does not exist")
metadata["size"] = os.path.getsize(path)
logger.info("File size: %r bytes", metadata["size"])
# Detect separator from extension
sep = "\t" if path.endswith(".tsv") else ","
# For large files, estimate rows to load based on target size
nrows = None
if metadata["size"] > load_max_size:
# Sample first 1000 rows to estimate average row size
sample_df = pandas.read_csv(
path, dtype=str, na_filter=False, sep=sep, nrows=1000
)
avg_row_size = (
metadata["size"] / (len(sample_df) + 1) if len(sample_df) > 0 else 100
)
nrows = int(load_max_size / avg_row_size)
logger.info("Large file, loading ~%d rows (estimated)", nrows)
data = pandas.read_csv(path, dtype=str, na_filter=False, sep=sep, nrows=nrows)
elif hasattr(data, "read"):
# File object
data.seek(0, 2)
metadata["size"] = data.tell()
data.seek(0, 0)
data = pandas.read_csv(data, dtype=str, na_filter=False)
elif not isinstance(data, pandas.DataFrame):
raise TypeError(
"data should be a filename, a file object, or a pandas.DataFrame"
)
# Step 2: Process DataFrame (common path for all input types)
if indexes and (
data.index.dtype != numpy.int64
or not pandas.Index(numpy.arange(len(data))).equals(data.index)
):
data = data.reset_index()
metadata["nb_rows"] = len(data)
logger.info("DataFrame: %d rows, %d columns", data.shape[0], data.shape[1])
# Compute stats on data before sampling (cheap operations)
# Also extract 3 non-null sample values for geo classifier
for col in data.columns:
# Count distinct values
full_data_stats[col] = {"num_distinct_values": data[col].nunique()}
# Extract 3 unique non-null sample values from full dataset
sample_values = []
seen = set()
col_series = data[col]
for v in col_series:
v_str = str(v).strip()
if v_str and v_str not in ("", "nan", "None") and v_str not in seen:
seen.add(v_str)
sample_values.append(v_str)
if len(sample_values) >= 3:
break
full_data_stats[col]["sample_values"] = sample_values
# Sample if DataFrame exceeds target size
avg_row_size = data.memory_usage(deep=True).sum() / max(len(data), 1)
max_rows = int(load_max_size / avg_row_size)
if len(data) > max_rows:
logger.info(
"Sampling %d rows for profiling (target size: %d bytes)",
max_rows,
load_max_size,
)
data = data.sample(n=max_rows, random_state=RANDOM_SEED)
metadata["nb_profiled_rows"] = len(data)
# Convert to string dtype (workaround for pandas nan-as-'nan' bug)
data = data.astype(object).fillna("").astype(str)
return data, metadata, data.columns, full_data_stats
def process_column(
array,
column_meta,
*,
manual=None,
plots=True,
coverage=True,
datamart_geo_data=None,
nominatim=None,
geo_prediction=None, # Pre-computed from batch prediction
):
structural_type = None
semantic_types_dict = {}
additional_meta = {}
used_geo_prediction = False
# Use pre-computed geo_prediction if available and no manual override
if geo_prediction is not None and manual is None:
label = geo_prediction["label"]
if label in GEO_CLASSIFIER_SPATIAL_MAP:
# Geo classifier identified a spatial type
used_geo_prediction = True
structural_type, semantic_list = GEO_CLASSIFIER_SPATIAL_MAP[label]
semantic_types_dict = {st: None for st in semantic_list}
additional_meta["geo_classifier"] = {
"label": label,
"confidence": geo_prediction["confidence"],
"source": geo_prediction.get("source", "ml"),
}
# If geo prediction wasn't used, use regular identify_types
if not used_geo_prediction:
structural_type, semantic_types_dict, additional_meta = identify_types(
array, column_meta["name"], datamart_geo_data, manual
)
# Log column type with source information
if used_geo_prediction:
geo_info = additional_meta.get("geo_classifier", {})
label = geo_info.get("label", "unknown")
confidence = geo_info.get("confidence", 0.0)
source = geo_info.get("source", "ml")
# Get sample values for logging (use stored samples if available)
column_name = column_meta.get("name", "unknown")
if geo_prediction and "sample_values" in geo_prediction:
sample_values = geo_prediction["sample_values"]
samples_str = ", ".join([str(v) for v in sample_values])
logger.info(
"Column type %s [%s] (from geo_classifier: column=%r, label=%s, confidence=%.4f, source=%s, samples=[%s])",
structural_type,
", ".join(semantic_types_dict),
column_name,
label,
confidence,
source,
samples_str,
)
else:
logger.info(
"Column type %s [%s]",
structural_type,
", ".join(semantic_types_dict),
)
# Set structural type
column_meta["structural_type"] = structural_type
# Add semantic types to the ones already present
sem_types = column_meta.setdefault("semantic_types", [])
for sem_type in semantic_types_dict:
if sem_type not in sem_types:
sem_types.append(sem_type)
# Insert additional metadata
column_meta.update(additional_meta)
# Resolved values are returned so they can be used again to compute spatial
# coverage
resolved = {}
# Compute ranges for numerical data
if structural_type in (types.INTEGER, types.FLOAT) and (coverage or plots):
# Get numerical values needed for either ranges or plot
numerical_values = []
for e in array:
try:
e = float(e)
except ValueError:
pass
else:
if -3.4e38 < e < 3.4e38: # Overflows in ES
numerical_values.append(e)
# Compute ranges from numerical values
if coverage:
column_meta["mean"], column_meta["stddev"] = mean_stddev(numerical_values)
ranges = get_numerical_ranges(numerical_values)
if ranges:
column_meta["coverage"] = ranges
# Compute histogram from numerical values
if plots:
counts, edges = numpy.histogram(
numerical_values,
bins=10,
)
counts = [int(i) for i in counts]
edges = [float(f) for f in edges]
column_meta["plot"] = {
"type": "histogram_numerical",
"data": [
{
"count": count,
"bin_start": edges[i],
"bin_end": edges[i + 1],
}
for i, count in enumerate(counts)
],
}
if types.DATE_TIME in semantic_types_dict:
datetimes = semantic_types_dict[types.DATE_TIME]
resolved["datetimes"] = datetimes
timestamps = numpy.empty(
len(datetimes),
dtype="float32",
)
for j, dt in enumerate(datetimes):
timestamps[j] = dt.timestamp()
resolved["timestamps"] = timestamps
# Compute histogram from temporal values
if plots and "plot" not in column_meta:
counts, edges = numpy.histogram(timestamps, bins=10)
counts = [int(i) for i in counts]
column_meta["plot"] = {
"type": "histogram_temporal",
"data": [
{
"count": count,
"date_start": datetime.utcfromtimestamp(
float(edges[i]),
).isoformat(),
"date_end": datetime.utcfromtimestamp(
float(edges[i + 1]),
).isoformat(),
}
for i, count in enumerate(counts)
],
}
# Compute histogram from categorical values
if plots and types.CATEGORICAL in semantic_types_dict:
counter = collections.Counter()
for value in array:
if not value:
continue
counter[value] += 1
counts = counter.most_common(5)
counts = sorted(counts)
column_meta["plot"] = {
"type": "histogram_categorical",
"data": [
{
"bin": value,
"count": count,
}
for value, count in counts
],
}
# Compute histogram from textual values
if plots and types.TEXT in semantic_types_dict and "plot" not in column_meta:
counter = collections.Counter()
for value in array:
for word in _re_word_split.split(value):
word = word.lower()
if word:
counter[word] += 1
counts = counter.most_common(5)
column_meta["plot"] = {
"type": "histogram_text",
"data": [
{
"bin": value,
"count": count,
}
for value, count in counts
],
}
# Resolve addresses into coordinates
if (
nominatim is not None
and structural_type == types.TEXT
and types.TEXT in semantic_types_dict
and types.ADMIN not in semantic_types_dict
):
locations, non_empty = nominatim_resolve_all(
nominatim,
array,
)
if non_empty > 0:
unclean_ratio = 1.0 - len(locations) / non_empty
if unclean_ratio <= MAX_UNCLEAN_ADDRESSES:
resolved["addresses"] = locations
if types.ADDRESS not in column_meta["semantic_types"]:
column_meta["semantic_types"].append(types.ADDRESS)
# Set level of administrative areas
if types.ADMIN in semantic_types_dict:
admin_value = semantic_types_dict[types.ADMIN]
if admin_value is not None:
# disambiguate_admin_areas returns (level, areas) tuple or just areas
if isinstance(admin_value, tuple) and len(admin_value) == 2:
level, areas = admin_value
else:
# Fallback: admin_value is just the areas list
level, areas = None, admin_value
if level is not None:
column_meta["admin_area_level"] = level
resolved["admin_areas"] = areas
return resolved
[docs]
def process_dataset(
data,
geo_classifier=True,
geo_classifier_threshold=0.5,
include_sample=False,
coverage=True,
plots=False,
indexes=True,
load_max_size=None,
metadata=None,
nominatim=None,
datamart_geo_data=None,
**kwargs,
):
"""Compute all metafeatures from a dataset.
:param data: path to dataset, or file object, or DataFrame
:param geo_classifier: ``True`` to enable geo_classifier
:param geo_classifier_threshold: Confidence threshold for geo_classifier
predictions (default: 0.85). Predictions below this threshold will be
treated as "non_spatial".
:param include_sample: Set to True to include a few random rows to the
result. Useful to present to a user.
:param coverage: Whether to compute data ranges
:param plots: Whether to compute plots
:param indexes: Whether to include indexes. If True (the default), the
input is a DataFrame, and it has index(es) different from the default
range, they will appear in the result with the columns.
:param load_max_size (bytes): Target size of the data to be analyzed. The data will
be randomly sampled if it is bigger. Defaults to `MAX_SIZE`, currently
5 MB (5000000). This is different from the sample data included in the result.
:param metadata: The metadata provided by the discovery plugin (might be
very limited).
:param nominatim: URL of the Nominatim server
:param datamart_geo_data: ``True`` or a datamart_geo.GeoData instance to use to
resolve named administrative territorial entities
:return: JSON structure (dict)
"""
# Track runtime for each pipeline step
pipeline_start = time.perf_counter()
step_times = {}
if "sample_size" in kwargs:
warnings.warn(
"Argument 'sample_size' is deprecated, use 'load_max_size'",
DeprecationWarning,
)
load_max_size = kwargs.pop("sample_size")
if kwargs:
raise TypeError(
"process_dataset() got unexpected keyword argument %r" % next(iter(kwargs))
)
if datamart_geo_data is True:
from datamart_geo import GeoData
datamart_geo_data = GeoData.from_local_cache()
if geo_classifier is True:
geo_classifier = HybridGeoClassifier(GeoClassifier())
if metadata is None:
metadata = {}
# =========================================================================
# STEP 1: Load data
# =========================================================================
step_start = time.perf_counter()
logger.info("[STEP 1/6] Loading data...")
try:
data, file_metadata, column_names, full_data_stats = load_data(
data,
load_max_size=load_max_size,
indexes=indexes,
)
except EmptyDataError:
logger.warning("Dataframe is empty!")
metadata["nb_rows"] = 0
metadata["nb_profiled_rows"] = 0
metadata["columns"] = []
metadata["types"] = []
return metadata
step_times["1_load_data"] = time.perf_counter() - step_start
logger.info(
"[STEP 1/6] Data loaded in %.3fs (%d rows, %d columns)",
step_times["1_load_data"],
data.shape[0],
data.shape[1],
)
metadata.update(file_metadata)
metadata["nb_profiled_rows"] = data.shape[0]
metadata["nb_columns"] = data.shape[1]
if "columns" in metadata:
columns = metadata["columns"]
logger.info("Using provided columns info")
if len(columns) != len(data.columns):
raise ValueError("Column metadata doesn't match number of columns")
for column_meta, name in zip(columns, column_names):
if "name" in column_meta and column_meta["name"] != name:
raise ValueError("Column names don't match")
column_meta["name"] = name
else:
logger.info("Setting column names from header")
columns = [{"name": name} for name in column_names]
metadata["columns"] = columns
if data.shape[0] == 0:
logger.info("0 rows, returning early")
metadata["types"] = []
return metadata
# Get manual updates from the user
manual_columns = {}
if "manual_annotations" in metadata:
if "columns" in metadata["manual_annotations"]:
manual_columns = {
col["name"]: col for col in metadata["manual_annotations"]["columns"]
}
# Cache some values that have been resolved for type identification but are
# useful for spatial coverage computation: admin areas and addresses
# Having to resolve them once to see if they're valid and a second time to
# build coverage information would be too slow
resolved_columns = {}
# =========================================================================
# STEP 2: Batch ML prediction for ALL columns (single forward pass!)
# =========================================================================
step_start = time.perf_counter()
logger.info("[STEP 2/6] Geo classifier batch prediction...")
geo_predictions = {} # column_idx -> prediction dict
if geo_classifier:
# Collect samples from all columns (no manual override)
# Use pre-extracted sample values from full dataset (before sampling)
columns_for_batch = []
for column_idx, column_meta in enumerate(columns):
name = column_meta["name"]
if name in manual_columns:
continue # Skip columns with manual annotations
# Use pre-extracted sample values from full dataset
if name in full_data_stats and "sample_values" in full_data_stats[name]:
sample_values = full_data_stats[name]["sample_values"]
if sample_values:
columns_for_batch.append((column_idx, name, sample_values))
# BATCH PREDICTION - single forward pass for ALL columns!
if columns_for_batch:
batch_inputs = [(name, vals) for _, name, vals in columns_for_batch]
batch_results = geo_classifier.predict_batch(
batch_inputs, threshold=geo_classifier_threshold
)
for (column_idx, name, sample_values), prediction in zip(
columns_for_batch, batch_results
):
# Store prediction with sample values used
geo_predictions[column_idx] = prediction
geo_predictions[column_idx]["sample_values"] = sample_values
step_times["2_geo_batch_predict"] = time.perf_counter() - step_start
# Incrementally save geo attributes to CSV
if geo_predictions:
geo_results = []
for col_idx, prediction in geo_predictions.items():
column_meta = columns[col_idx]
prediction = geo_predictions[col_idx]
label = prediction.get("label", "non_spatial")
if label == "non_spatial":
continue
col_name = column_meta["name"]
geo_results.append(
{
"name": col_name,
"values": prediction["sample_values"],
"label": label,
}
)
if geo_results:
geo_df = pandas.DataFrame(geo_results)
if os.path.exists("output/geo_classifier_results.csv"):
mode = "a"
header = False
else:
os.makedirs("output", exist_ok=True)
mode = "w"
header = True
geo_df.to_csv(
"output/geo_classifier_results.csv",
index=False,
mode=mode,
header=header,
)
logger.info(
"Saved %d geo classifier results to output/geo_classifier_results.csv",
len(geo_results),
)
logger.info(
"[STEP 2/6] Geo batch prediction completed in %.3fs (%d columns)",
step_times["2_geo_batch_predict"],
len(geo_predictions),
)
# =========================================================================
# STEP 3: Process columns (apply predictions + compute features)
# =========================================================================
step_start = time.perf_counter()
logger.info("[STEP 3/6] Processing %d columns...", len(columns))
for col_idx, column_meta in enumerate(columns):
resolved = process_column(
data[column_meta["name"]],
column_meta,
manual=manual_columns.get(column_meta["name"]),
plots=plots,
coverage=coverage,
datamart_geo_data=datamart_geo_data,
nominatim=nominatim,
geo_prediction=geo_predictions.get(col_idx),
)
resolved_columns[col_idx] = resolved
# Override with exact stats from full data (computed before sampling)
col_name = column_meta["name"]
if col_name in full_data_stats:
stats = full_data_stats[col_name]
if "num_distinct_values" in column_meta:
column_meta["num_distinct_values"] = stats["num_distinct_values"]
step_times["3_process_columns"] = time.perf_counter() - step_start
logger.info(
"[STEP 3/6] Column processing completed in %.3fs",
step_times["3_process_columns"],
)
# =========================================================================
# STEP 4: Pair lat/long columns and determine dataset types
# =========================================================================
step_start = time.perf_counter()
logger.info("[STEP 4/6] Pairing lat/long columns and determining dataset types...")
# Pair lat & long columns
columns_lat = [
LatLongColumn(
index=col_idx,
name=col["name"],
annot_pair=manual_columns.get(col["name"], {}).get("latlong_pair"),
)
for col_idx, col in enumerate(columns)
if types.LATITUDE in col["semantic_types"]
]
columns_long = [
LatLongColumn(
index=col_idx,
name=col["name"],
annot_pair=manual_columns.get(col["name"], {}).get("latlong_pair"),
)
for col_idx, col in enumerate(columns)
if types.LONGITUDE in col["semantic_types"]
]
latlong_pairs, (missed_lat, missed_long) = pair_latlong_columns(
columns_lat, columns_long
)
# Log missed columns
if missed_lat:
logger.warning("Unmatched latitude columns: %r", missed_lat)
if missed_long:
logger.warning("Unmatched longitude columns: %r", missed_long)
# Remove semantic type from unpaired columns
for col in columns:
if col["name"] in missed_lat:
col["semantic_types"].remove(types.LATITUDE)
if col["name"] in missed_long:
col["semantic_types"].remove(types.LONGITUDE)
# Identify the overall dataset types (numerical, categorical, spatial, or temporal)
dataset_types = collections.Counter()
for column_meta in columns:
dataset_type = determine_dataset_type(
column_meta["structural_type"],
column_meta["semantic_types"],
)
if dataset_type:
dataset_types[dataset_type] += 1
for key, d_type in [
("nb_spatial_columns", types.DATASET_SPATIAL),
("nb_temporal_columns", types.DATASET_TEMPORAL),
("nb_categorical_columns", types.DATASET_CATEGORICAL),
("nb_numerical_columns", types.DATASET_NUMERICAL),
]:
if dataset_types[d_type]:
metadata[key] = dataset_types[d_type]
metadata["types"] = sorted(set(dataset_types))
step_times["4_latlong_pairing"] = time.perf_counter() - step_start
logger.info(
"[STEP 4/6] Lat/long pairing completed in %.3fs (%d pairs found)",
step_times["4_latlong_pairing"],
len(latlong_pairs),
)
# =========================================================================
# STEP 5: Compute spatial coverage
# =========================================================================
step_start = time.perf_counter()
if coverage:
logger.info("[STEP 5/6] Computing spatial coverage...")
spatial_coverage = []
with NoOpContext():
# Compute sketches from lat/long pairs
for col_lat, col_long in latlong_pairs:
lat_values = data.iloc[:, col_lat.index]
lat_values = pandas.to_numeric(lat_values, errors="coerce")
long_values = data.iloc[:, col_long.index]
long_values = pandas.to_numeric(long_values, errors="coerce")
mask = (
~numpy.isnan(lat_values)
& ~numpy.isnan(long_values)
& (-90.0 < lat_values)
& (lat_values < 90.0)
& (-180.0 < long_values)
& (long_values < 180.0)
)
if mask.any():
lat_values = lat_values[mask]
long_values = long_values[mask]
values = numpy.array([lat_values, long_values]).T
logger.info(
"Computing spatial sketch lat=%r long=%r (%d rows)",
col_lat.name,
col_long.name,
len(values),
)
# Ranges
spatial_ranges = get_spatial_ranges(values)
# Geohashes
builder = Geohasher(number=MAX_GEOHASHES)
builder.add_points(values)
hashes = builder.get_hashes_json()
spatial_coverage.append(
{
"type": "latlong",
"column_names": [col_lat.name, col_long.name],
"column_indexes": [
col_lat.index,
col_long.index,
],
"geohashes4": hashes,
"ranges": spatial_ranges,
"number": len(values),
}
)
# Compute sketches from WKT points
for i, col in enumerate(columns):
if col["structural_type"] != types.GEO_POINT:
continue
latlong = col.get("point_format") == "lat,long"
name = col["name"]
values = parse_wkt_column(
data.iloc[:, i],
latlong=latlong,
)
total = numpy.sum(data.iloc[:, i].apply(lambda x: bool(x)))
if len(values) < 0.5 * total:
logger.warning(
"Most data points did not parse correctly as "
"point (%s) col=%d %r",
"lat,long" if latlong else "long,lat",
i,
col,
)
if values:
logger.info(
"Computing spatial sketches point=%r (%d rows)",
name,
len(values),
)
# Ranges
spatial_ranges = get_spatial_ranges(values)
# Geohashes
builder = Geohasher(number=MAX_GEOHASHES)
builder.add_points(values)
hashes = builder.get_hashes_json()
spatial_coverage.append(
{
"type": "point_latlong" if latlong else "point",
"column_names": [name],
"column_indexes": [i],
"geohashes4": hashes,
"ranges": spatial_ranges,
"number": len(values),
}
)
for idx, resolved in resolved_columns.items():
# Compute sketches from addresses
if "addresses" in resolved:
locations = resolved["addresses"]
name = columns[idx]["name"]
logger.info(
"Computing spatial sketches address=%r (%d rows)",
name,
len(locations),
)
# Ranges
spatial_ranges = get_spatial_ranges(locations)
# Geohashes
builder = Geohasher(number=MAX_GEOHASHES)
builder.add_points(locations)
hashes = builder.get_hashes_json()
spatial_coverage.append(
{
"type": "address",
"column_names": [name],
"column_indexes": [idx],
"geohashes4": hashes,
"ranges": spatial_ranges,
"number": len(locations),
}
)
# Compute sketches from administrative areas
if "admin_areas" in resolved:
areas = resolved["admin_areas"]
name = columns[idx]["name"]
logger.info(
"Computing spatial sketches admin_areas=%r (%d rows)",
name,
len(areas),
)
cov = {
"type": "admin",
"column_names": [name],
"column_indexes": [idx],
}
# Merge into a single range
merged = None
for area in areas:
if area is None:
continue
new = area.bounds
if new:
if merged is None:
merged = new
else:
merged = (
min(merged[0], new[0]),
max(merged[1], new[1]),
min(merged[2], new[2]),
max(merged[3], new[3]),
)
if (
merged is not None
and merged[1] - merged[0] > 0.01
and merged[3] - merged[2] > 0.01
):
logger.info("Computed bounding box")
cov["ranges"] = [
{
"range": {
"type": "envelope",
"coordinates": [
[merged[0], merged[3]],
[merged[1], merged[2]],
],
},
},
]
else:
logger.info("Couldn't build a bounding box")
# Compute geohashes
builder = Geohasher(number=MAX_GEOHASHES)
for area in areas:
if area is None or not area.bounds:
continue
builder.add_aab(area.bounds)
hashes = builder.get_hashes_json()
if hashes:
cov["geohashes4"] = hashes
# Count
cov["number"] = builder.total
if "ranges" in cov or "geohashes4" in cov:
spatial_coverage.append(cov)
if spatial_coverage:
metadata["spatial_coverage"] = spatial_coverage
step_times["5_spatial_coverage"] = time.perf_counter() - step_start
logger.info(
"[STEP 5/6] Spatial coverage completed in %.3fs (%d coverage entries)",
step_times["5_spatial_coverage"],
len(spatial_coverage),
)
# =====================================================================
# STEP 6: Compute temporal coverage
# =====================================================================
step_start = time.perf_counter()
logger.info("[STEP 6/6] Computing temporal coverage...")
temporal_coverage = []
with NoOpContext():
# Datetime columns
for idx, col in enumerate(columns):
if types.DATE_TIME not in col["semantic_types"]:
continue
datetimes = resolved_columns[idx]["datetimes"]
timestamps = resolved_columns[idx]["timestamps"]
logger.info(
"Computing temporal ranges datetime=%r (%d rows)",
col["name"],
len(datetimes),
)
# Get temporal ranges
ranges = get_numerical_ranges(timestamps)
if not ranges:
continue
# Get temporal resolution
resolution = get_temporal_resolution(datetimes)
temporal_coverage.append(
{
"type": "datetime",
"column_names": [col["name"]],
"column_indexes": [idx],
"column_types": [types.DATE_TIME],
"ranges": ranges,
"temporal_resolution": resolution,
}
)
# TODO: Times split over multiple columns
if temporal_coverage:
metadata["temporal_coverage"] = temporal_coverage
step_times["6_temporal_coverage"] = time.perf_counter() - step_start
logger.info(
"[STEP 6/6] Temporal coverage completed in %.3fs (%d coverage entries)",
step_times["6_temporal_coverage"],
len(temporal_coverage),
)
else:
step_times["5_spatial_coverage"] = 0
step_times["6_temporal_coverage"] = 0
logger.info("[STEP 5-6/6] Coverage computation skipped (coverage=False)")
# Attribute names
attribute_keywords = []
for col in columns:
attribute_keywords.append(col["name"])
kw = list(expand_attribute_name(col["name"]))
if kw != [col["name"]]:
attribute_keywords.extend(kw)
metadata["attribute_keywords"] = attribute_keywords
# Sample data
if include_sample:
step_start = time.perf_counter()
rand = numpy.random.RandomState(RANDOM_SEED)
choose_rows = rand.choice(
len(data),
min(SAMPLE_ROWS, len(data)),
replace=False,
)
choose_rows.sort() # Keep it in order
sample = data.iloc[choose_rows]
sample = sample.map(truncate_string) # Truncate long values
metadata["sample"] = sample.to_csv(index=False, lineterminator="\r\n")
step_times["sample_extraction"] = time.perf_counter() - step_start
# =========================================================================
# PIPELINE SUMMARY
# =========================================================================
total_time = time.perf_counter() - pipeline_start
# Build summary string
summary_parts = [f"\n{'='*60}"]
summary_parts.append("PROFILING PIPELINE SUMMARY")
summary_parts.append(f"{'='*60}")
summary_parts.append(f"Dataset: {data.shape[0]} rows × {data.shape[1]} columns")
summary_parts.append(f"{'-'*60}")
for step_name, step_time in step_times.items():
pct = (step_time / total_time * 100) if total_time > 0 else 0
bar_len = int(pct / 2) # 50 chars = 100%
bar = "█" * bar_len + "░" * (50 - bar_len)
summary_parts.append(f"{step_name:25s} {step_time:7.3f}s ({pct:5.1f}%) {bar}")
summary_parts.append(f"{'-'*60}")
summary_parts.append(f"{'TOTAL':25s} {total_time:7.3f}s")
summary_parts.append(f"{'='*60}\n")
logger.info("\n".join(summary_parts))
# Also store timing in metadata for analysis
metadata["_profiling_times"] = {
"steps": step_times,
"total": total_time,
}
return metadata