Skip to content

dathere/datapusher-plus

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1,342 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

DataPusher+

NOTE: v2 is a major revamp. Documentation is currently WIP.

DataPusher+ is a fork of Datapusher that combines the speed and robustness of ckanext-xloader with the data type guessing of Datapusher - super-powered with the ability to infer, calculate & suggest metadata using Jinja2 formulas defined in the scheming configuration file.

Screen.Recording.2025-05-09.at.12.26.32.PM.mov

The Formulas have access to not just the package and resource fields (in the same namespaces), it also has access to the following information in these additional namespaces that can be used in Jinja2 expressions:

  • dpps - with the "s" for stats.
    Each field will have an extensive list of summary statistics (by default: type, is_ascii, sum, min/max, range, sort_order, sortiness, min_length, max_length, sum_length, avg_length, stddev_length, variance_length, cv_length, mean, sem, geometric_mean, harmonic_mean, stddev, variance, cv, nullcount, max_precision, sparsity, cardinality, uniqueness_ratio.) Check here for all other available statistics.
  • dppf - with the "f" for frequency table.
    Each field will have its frequency table available sorted in descending order the top N (configurable, default 10) values, with a corresponding count & percentage. "Other (COUNT)" will be used as a "basket" for other values with COUNT set to the count of other values beyond the top N. ID fields will be indicated by "<ALL_UNIQUE>" in the table.
  • dpp - additional inferred/calculated metadata.
    • ORIGINAL_FILE_SIZE (bytes)
    • PREVIEW_FILE_SIZE (bytes)
    • RECORD_COUNT (int)
    • PREVIEW_RECORD_COUNT (int)
    • IS_SORTED (bool)
    • DEDUPED (bool)
    • DUPE_COUNT (int: -1 if there are no dupes)
    • Date/DateTime metadata
      DP+ can infer date/datetime columns - supporting 19 different formats. As it is a relatively expensive operation, it will only do so for candidate columns with names that fit a configurable pattern.
      • DATE_FIELDS - a list of inferred date columns
      • NO_DATE_FIELDS (bool)
      • DATETIME_FIELDS - a list of inferred datetime columns
      • NO_DATETIME_FIELDS (bool)
    • Latitude/Longitude metadata
      DP+ can infer the latitude and longitude columns based on the column's characteristics. A column is inferred to be a latitude/longitude column if:
      • its in a comma-separated priority-order list of lat/long name patterns
      • for latitude, if its of type "Float" with a range of -90.0 to 90.0, and
      • for longitude, if its a "Float" with a range of -180.0 to 180.0.
      • LAT_FIELD and LON_FIELD - the inferred lat/long columns
      • NO_LAT_LONG_FIELDS (bool)

Beyond the extensive list of built-in Jinja2 filters/functions, DP+ also supports an extensive list of additional custom filters/functions. Several of these helper functions make it trivially easy to calculate DCAT 3 recommended, optional properties that would ordinarily be too painstaking to manually compile (e.g dcat-us:GeographicBoundingBox, dcat:temporalResolution, dcat:startDate, dcat:endDate, etc. ).

There are two Formula types that are indicated by adding these keywords to the scheming yaml file:

  • formula - the formula will be evaluated at resource creation/update time and the result is assigned to the corresponding package/resource field immediately.
  • suggest_formula - the formula will be evaluated at resource creation/update time and the result is stored in the dpp_suggestions package field as a compound JSON object. dpp_suggestions contains all the suggestion for both package and resource fields. This field is parsed to show "Suggestions" during metadata entry for the associated package/resource field using the Suggestion UI (indicated by a function symbol next to the metadata field name).

Formulas that fail to evaluate will return with the #ERROR!: (reminiscent of Excel's #VALUE! function error) prefix followed by a detailed Jinja2 error message.

In addition, Datapusher+ is no longer a webservice, but a full-fledged CKAN extension. It drops usage of the deprecated CKAN Service Provider, with the unmaintained Messytables replaced by the blazing-fast qsv data-wrangling engine.

TxGIO/TWDB provided the use cases that informed and supported the development of Datapusher+, specifically, to support a Resource-first upload workflow.

For a more detailed overview, see the CKAN Monthly Live Jan 2023 presentation.

It features:

  • "Bullet-proof", ultra-fast data type inferencing with qsv

    Unlike Messytables which scans only the the first few rows to guess the type of a column, qsv scans the entire table so its data type inferences are guaranteed1.

    Despite this, qsv is still exponentially faster even if it scans the whole file, not only inferring data types, it also calculates summary statistics as well. For example, scanning a 2.7 million row, 124MB CSV file for types and stats took 0.16 seconds2.

    It is very fast as qsv is written in Rust, is multithreaded, and uses all kinds of performance techniques especially designed for data-wrangling.

  • Exponentially faster loading speed

    Similar to xloader, we use PostgreSQL COPY to directly pipe the data into the datastore, short-circuiting the additional processing/transformation/API calls used by Datapusher Plus.

    But unlike xloader, we load everything using the proper data types and not as text, so there's no need to reload the data again after adjusting the Data Dictionary, as you would with xloader.

  • Far more Storage Efficient AND Performant Datastore with easier to compose SQL queries

    As we create the Datastore tables using the most efficient PostgreSQL data type for each column using qsv's guaranteed type inferences - the Datastore is not only more storage efficient, it is also far more more performant for loading AND querying.

    With its "smartint" data type (with qsv inferring the most efficient integer data type for the range of values in the column); comprehensive date format inferencing (supporting 19 date formats, with each format having several variants & with configurable DMY/MDY preference parsing) & auto-formatting dates to RFC3339 format so they are stored as Postgres timestamps; cardinality-aware, configurable auto-indexing; automatic sanitization of column names to valid PostgreSQL column identifiers; auto PostgreSQL vacuuming & analysis of resources after loading; and more - DP+ enables the Datastore to tap into PostgreSQL's full power.

    Configurable auto-aliasing of resources also makes it easier to compose SQL queries, as you can use more intuitive resource aliases instead of cryptic resource IDs.

  • Production-ready Robustness

    In production, the number one source of support issues is Datapusher - primarily, because of data quality issues and Datapusher's inability to correctly infer data types, gracefully handle errors3, and provide the Data Publisher actionable information to correct the data.

    Datapusher+'s design directly addresses all these issues.

  • More informative datastore loading messages

    Datapusher+ messages are designed to be more verbose and actionable, so the data publisher's user experience is far better and makes it possible to have a resource-first upload workflow.

  • Extended preprocessing with qsv

    qsv is leveraged by Datapusher+ to:

    • create "Smarter" Data Dictionaries, with:
      • guaranteed data type inferences
      • optional ability to automatically choose the best integer PostgreSQL data type ("smartint") based on the range of the numeric column (PostgreSQL's int, bigint and numeric types) for optimal storage/indexing efficiency and SQL query performance.
      • sanitized column names (guaranteeing valid PostgreSQL column identifiers) while preserving the original column name as a label, which is used to label columns in DataTables_view.
      • an optional "summary stats" resource as an extension of the Data Dictionary, with comprehensive summary statistics for each column - sum, min/max/range, min/max length, mean, stddev, variance, nullcount, sparsity, quartiles, IQR, lower/upper fences, skewness, median, mode/s, antimode/s & cardinality.
    • convert Excel & OpenOffice/LibreOffice Calc (ODS) files to CSV, with the ability to choose which sheet to use by default (e.g. 0 is the first sheet, -1 is the last sheet, -2 the second to last sheet, etc.)
    • convert SHP and GeoJSON files to CSV, with optional geometry simplification.
    • decompress ZIP archives and insert the manifest as a CSV file with detailed metadata about the files in the archive. For ZIP archives with only one recognized file format, it can also automatically decompress the file and push that instead of the ZIP manifest into the Datastore.
    • convert various date formats (19 date formats are recognized with each format having several variants; ~80 date format permutations in total) to a standard RFC 3339 format
    • enable random access of a CSV by creating a CSV index - which also enables parallel processing of different parts of a CSV simultaneously (a major reason type inferencing and stats calculation is so fast)
    • instantaneously count the number of rows with a CSV index
    • validate if an uploaded CSV conforms to the RFC-4180 standard
    • normalizes and transcodes CSV/TSV dialects into a standard UTF-8 encoded RFC-4180 CSV format
    • optionally create a preview subset, with the ability to only download the first n preview rows of a file, and not the entire file (e.g. only download first 1,000 rows of 3 gb CSV file - especially good for harvesting/cataloging external sites where you only want to harvest the metadata and a small sample of each file).
    • optionally create a preview subset from the end of a file (e.g. last 1,000 rows, good for time-series/sensor data)
    • auto-index columns based on its cardinality/format (unique indices created for columns with all unique values, auto-index columns whose cardinality is below a given threshold; auto-index date columns)
    • check for duplicates, and optionally deduplicate rows
    • optionally screen for Personally Identifiable Information (PII), with an option to "quarantine" the PII-candidate rows in a separate resource, while still creating the screened resource.
    • optional ability to specify a custom PII screening regex set, instead of the default PII screening regex set.

    Even with all these pre-processing tasks, qsv typically takes less than 5 seconds to finish all its analysis tasks, even for a 100mb CSV file.

    Future versions of Datapusher+ will further leverage qsv's 80+ commands to do additional preprocessing, data-wrangling and validation. The Roadmap is available here. Ideas, suggestions and your feedback are most welcome!

DRUF: Dataset Resource Upload First Workflow

DataPusher+ supports an optional DRUF (Dataset Resource Upload First) workflow that allows users to upload data files before creating dataset metadata. This resource-first approach is particularly useful for:

  • Data-driven workflows: Where the structure and content of the data informs the metadata
  • Exploratory data publishing: When you want to examine the data before writing descriptions
  • Simplified workflows: Reducing the cognitive load of filling out metadata forms upfront

How DRUF Works

When DRUF is enabled, the dataset creation workflow is modified:

  1. "Add Dataset" buttons redirect to a resource upload page instead of the metadata form
  2. Temporary datasets are automatically created with placeholder metadata
  3. Resource upload happens first, allowing DataPusher+ to analyze the data
  4. Metadata forms are enhanced with data-driven suggestions based on the uploaded content
  5. Form redirects guide users through a logical resource-first workflow

Enabling DRUF

  • To enable DRUF you need DRUF compatable ckan version
  • You need to have scheming extension enabled and use the example DRUF compatable schema included in the dp+ extension.

Add the following configuration to your CKAN config file (e.g., /etc/ckan/default/ckan.ini):

# Enable DRUF (Dataset Resource Upload First) workflow
ckanext.datapusher_plus.enable_druf = true
ckanext.datapusher_plus.enable_form_redirect = true

Backwards Compatibility

DRUF is completely optional and disabled by default. When disabled:

  • Standard CKAN dataset creation workflow is preserved
  • No template modifications are applied
  • Full backwards compatibility with existing CKAN installations

Requirements:

  • CKAN 2.10+
  • Python 3.10+
  • tested and developed on Ubuntu 22.04.5
  • ckan.datastore.sqlsearch.enabled set to true if you want to use the temporal_resolution and guess_accrual_periodicity Formula helpers
  • ckanext-scheming extension

Development Installation

Datapusher+ from version 1.0.0 onwards will be installed as a extension of CKAN, and will be available as a CKAN plugin. This will allow for easier integration with CKAN and other CKAN extensions.

  1. Install the required packages. We expect you are using a Linux distribution based on Ubuntu such as Ubuntu 24.04.
sudo apt install python3-virtualenv python3-dev python3-pip python3-wheel build-essential libxslt1-dev libxml2-dev zlib1g-dev git libffi-dev libpq-dev uchardet -y
  1. Activate the CKAN virtual environment using at least python 3.10.
. /usr/lib/ckan/default/bin/activate
  1. Install the extension using following commands:
cd /usr/lib/ckan/default/src
pip install -e "datapusher-plus@git+https://github.com/dathere/datapusher-plus.git@3.0.0"
  1. Install the dependencies.
cd datapusher-plus
pip install -r requirements.txt
pip install -r requirements-dev.txt
  1. Install qsv, such as the qsvdp binary and move it to /usr/local/bin/qsvdp for access through the PATH environment variable.
qsv installation options (click here for more info)

Option 1: Install qsv from the Debian package

If you are running a Debian-based Linux distribution on x86_64, you can quickly install qsv binaries including qsvdp using the following commands:

# Add the qsv repository to your sources list:
echo "deb [signed-by=/etc/apt/trusted.gpg.d/qsv-deb.gpg] https://dathere.github.io/qsv-deb-releases ./" > qsv.list
# Import trusted GPG key:
wget -O - https://dathere.github.io/qsv-deb-releases/qsv-deb.gpg | sudo apt-key add -
# Install qsv:
sudo apt update -y
sudo apt install qsv -y

Option 2: Install prebuilt qsv binaries

Download the appropriate prebuilt binaries for your platform and copy it to the appropriate directory. For example you can use the following commands for qsv v20.0.0 on x86_64 Linux (you can update the version 20.0.0 to the latest version available on the releases page):

wget https://github.com/dathere/qsv/releases/download/20.0.0/qsv-20.0.0-x86_64-unknown-linux-gnu.zip
unzip qsv-20.0.0-x86_64-unknown-linux-gnu.zip
rm qsv-20.0.0-x86_64-unknown-linux-gnu.zip
sudo mv qsv* /usr/local/bin

If you get glibc errors when starting qsv, your Linux distro may not have the required version of the GNU C Library. If so, use the binaries ending with unknown-linux-musl instead as it they should be statically linked with the MUSL C Library.

ℹ️ NOTE: qsv's prebuilt binaries have the ability to self-update to the latest version. Just run qsv with the --update option and it will check for the latest version and update itself as required.

sudo qsvdp --update

Option 3: Install qsv from source

Alternatively, if you want to install qsv from source, follow the instructions here. Note that when compiling from source, you may want to look into the Performance Tuning section to squeeze even more performance from qsv.

Also, if you get glibc errors when starting qsv, your Linux distro may not have the required version of the GNU C Library (This will be the case when running Ubuntu 18.04 or older). If so, use the unknown-linux-musl.zip archive as it is statically linked with the MUSL C Library.

If you already have qsv, update it to the latest release by using the --update option.

qsvdp --update

ℹ️ NOTE: qsv is a general purpose CSV data-wrangling toolkit that gets regular updates. To update to the latest version, just run qsv with the --update option and it will check for the latest version and update as required.

Finally, you can build qsvdp from source. It has the additional benefit that the resulting binary will take advantage of all the machine's CPU features, making qsv and DP+ even faster, but may take up to 30 minutes to compile.

git clone https://github.com/dathere/qsv.git
cd qsv

# install Rust, if it's not installed
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# build qsvdp
CARGO_BUILD_RUSTFLAGS='-C target-cpu=native' cargo build --release --locked --bin qsvdp -F datapusher_plus
sudo cp target/release/qsvdp /usr/local/bin
cargo clean
  1. Make sure CKAN is running (e.g. through ckan -c /etc/ckan/default/ckan.ini run after activating your virtual environment) then while CKAN is running create an API token for the DP+ Service account which this command automatically adds the relevant config ckanext.datapusher_plus.api_token line to your CKAN config file /etc/ckan/default/ckan.ini. Replace CKAN_ADMIN in the following command with an existing CKAN user with sysadmin privileges.
ckan config-tool /etc/ckan/default/ckan.ini "ckanext.datapusher_plus.api_token=$(ckan -c /etc/ckan/default/ckan.ini user token add CKAN_ADMIN dpplus | tail -n 1 | tr -d '\t')"
  1. Add the rest of the DP+ config to your CKAN config (e.g. /etc/ckan/default/ckan.ini):
# datapusher-plus settings
ckanext.datapusher_plus.use_proxy = false
ckanext.datapusher_plus.download_proxy = 
ckanext.datapusher_plus.ssl_verify = false
# supports INFO, DEBUG, TRACE - use DEBUG or TRACE when debugging scheming Formulas
ckanext.datapusher_plus.upload_log_level = INFO
ckanext.datapusher_plus.formats = csv tsv tab ssv xls xlsx xlsxb xlsm ods geojson shp qgis zip
ckanext.datapusher_plus.pii_screening = false
ckanext.datapusher_plus.pii_found_abort = false
ckanext.datapusher_plus.pii_regex_resource_id_or_alias =
ckanext.datapusher_plus.pii_show_candidates = false
ckanext.datapusher_plus.pii_quick_screen = false
ckanext.datapusher_plus.qsv_bin = /usr/local/bin/qsvdp
ckanext.datapusher_plus.preview_rows = 100
ckanext.datapusher_plus.download_timeout = 300
ckanext.datapusher_plus.max_content_length = 1256000000000
ckanext.datapusher_plus.chunk_size = 16384
ckanext.datapusher_plus.default_excel_sheet = 0
ckanext.datapusher_plus.sort_and_dupe_check = true
ckanext.datapusher_plus.dedup = false
ckanext.datapusher_plus.unsafe_prefix = unsafe_
ckanext.datapusher_plus.reserved_colnames = _id
ckanext.datapusher_plus.prefer_dmy = false
ckanext.datapusher_plus.ignore_file_hash = true
ckanext.datapusher_plus.auto_index_threshold = 3
ckanext.datapusher_plus.auto_index_dates = true
ckanext.datapusher_plus.auto_unique_index = true
ckanext.datapusher_plus.summary_stats_options =
ckanext.datapusher_plus.add_summary_stats_resource = false
ckanext.datapusher_plus.summary_stats_with_preview = false
ckanext.datapusher_plus.qsv_stats_string_max_length = 32767
ckanext.datapusher_plus.qsv_dates_whitelist = date,time,due,open,close,created
ckanext.datapusher_plus.qsv_freq_limit = 10
ckanext.datapusher_plus.auto_alias = true
ckanext.datapusher_plus.auto_alias_unique = false
ckanext.datapusher_plus.copy_readbuffer_size = 1048576
ckanext.datapusher_plus.type_mapping = {"String": "text", "Integer": "numeric","Float": "numeric","DateTime": "timestamp","Date": "date","NULL": "text"}
ckanext.datapusher_plus.auto_spatial_simplication = true
ckanext.datapusher_plus.spatial_simplication_relative_tolerance = 0.1
ckanext.datapusher_plus.latitude_fields = latitude,lat
ckanext.datapusher_plus.longitude_fields = longitude,long,lon
ckanext.datapusher_plus.jinja2_bytecode_cache_dir = /tmp/jinja2_butecode_cache
ckanext.datapusher_plus.auto_unzip_one_file = true

Also add this entry to your CKAN's resource_formats.json file for ckanext.datapusher_plus.formats to work as expected with tab files.

["TAB", "Tab Separated Values File", "text/tab-separated-values", []],

See the configuration section below for more information.

  1. Optionally add DRUF mode to your CKAN config:
# Enable DRUF (Dataset Resource Upload First) workflow for the DataPusher+ CKAN extension
ckanext.datapusher_plus.enable_druf = true
ckanext.datapusher_plus.enable_form_redirect = true
  1. Set up the database for datapusher_plus:
ckan -c /etc/ckan/default/ckan.ini db upgrade -p datapusher_plus
  1. If you get Missing value for multiple fields as a ckan.logic.ValidationError, temporarily you can add validators: ignore_missing for those fields in their YAML schema file used in ckanext-scheming and you may also need to set required: False.
  2. Make sure you enable the FileStore for allowing file uploads (the ckan.uploads_enabled variable is available in your CKAN config already and you should set it to true). You'll also need to update FileStore storage permissions as per the docs, for example replace the Linux username rzmk to your username in the following commands:
sudo chown rzmk /var/lib/ckan/default
sudo chmod -R u+rwx /var/lib/ckan/default
  1. Make sure you enable the Datastore plugin.
  2. In a separate terminal start the job queue:
ckan -c /etc/ckan/default/ckan.ini jobs worker

Configuring

CKAN Configuration

Add datapusher_plus to the plugins in your CKAN configuration file (generally located at /etc/ckan/default/ckan.ini):

ckan.plugins = <other plugins> datapusher_plus

Use a DP+ extended scheming schema:

scheming.dataset_schemas =  ckanext.datapusher_plus:dataset-druf.yaml

Configure DP+ numerous settings. See config.py for details.

ckanext.datapusher_plus.use_proxy = false
ckanext.datapusher_plus.download_proxy = 
ckanext.datapusher_plus.ssl_verify = false
# supports INFO, DEBUG, TRACE - use DEBUG or TRACE when debugging scheming Formulas
ckanext.datapusher_plus.upload_log_level = INFO
ckanext.datapusher_plus.formats = csv tsv tab ssv xls xlsx xlsxb xlsm ods geojson shp qgis zip
ckanext.datapusher_plus.pii_screening = false
ckanext.datapusher_plus.pii_found_abort = false
ckanext.datapusher_plus.pii_regex_resource_id_or_alias =
ckanext.datapusher_plus.pii_show_candidates = false
ckanext.datapusher_plus.pii_quick_screen = false
ckanext.datapusher_plus.qsv_bin = /usr/local/bin/qsvdp
ckanext.datapusher_plus.preview_rows = 100
ckanext.datapusher_plus.download_timeout = 300
ckanext.datapusher_plus.max_content_length = 1256000000000
ckanext.datapusher_plus.chunk_size = 16384
ckanext.datapusher_plus.default_excel_sheet = 0
ckanext.datapusher_plus.sort_and_dupe_check = true
ckanext.datapusher_plus.dedup = false
ckanext.datapusher_plus.unsafe_prefix = unsafe_
ckanext.datapusher_plus.reserved_colnames = _id
ckanext.datapusher_plus.prefer_dmy = false
ckanext.datapusher_plus.ignore_file_hash = true
ckanext.datapusher_plus.auto_index_threshold = 3
ckanext.datapusher_plus.auto_index_dates = true
ckanext.datapusher_plus.auto_unique_index = true
ckanext.datapusher_plus.summary_stats_options =
ckanext.datapusher_plus.add_summary_stats_resource = false
ckanext.datapusher_plus.summary_stats_with_preview = false
ckanext.datapusher_plus.qsv_stats_string_max_length = 32767
ckanext.datapusher_plus.qsv_dates_whitelist = date,time,due,open,close,created
ckanext.datapusher_plus.qsv_freq_limit = 10
ckanext.datapusher_plus.auto_alias = true
ckanext.datapusher_plus.auto_alias_unique = false
ckanext.datapusher_plus.copy_readbuffer_size = 1048576
ckanext.datapusher_plus.type_mapping = {"String": "text", "Integer": "numeric","Float": "numeric","DateTime": "timestamp","Date": "date","NULL": "text"}
ckanext.datapusher_plus.auto_spatial_simplication = true
ckanext.datapusher_plus.spatial_simplication_relative_tolerance = 0.1
ckanext.datapusher_plus.latitude_fields = latitude,lat
ckanext.datapusher_plus.longitude_fields = longitude,long,lon
ckanext.datapusher_plus.jinja2_bytecode_cache_dir = /tmp/jinja2_butecode_cache
ckanext.datapusher_plus.auto_unzip_one_file = true
ckanext.datapusher_plus.api_token = <CKAN service account token for CKAN user with sysadmin privileges>
ckanext.datapusher_plus.describeGPT_api_key = <Token for OpenAI API compatible service>

and add this entry to your CKAN's resource_formats.json file.

["TAB", "Tab Separated Values File", "text/tab-separated-values", []],

Usage

Any file that has one of the supported formats (defined in ckanext.datapusher_plus.formats) will be attempted to be loaded into the DataStore.

You can also manually trigger resources to be resubmitted. When editing a resource in CKAN (clicking the "Manage" button on a resource page), a new tab named "DataStore" will appear. This will contain a log of the last attempted upload and a button to retry the upload. Once a resource has been "pushed" into the Datastore, a "Data Dictionary" tab will also be available where the data pusblisher can fine-tune the inferred data dictionary.

DataPusher+ UI DataPusher+ UI 2

Command line

Run the following command to submit all resources to datapusher, although it will skip files whose hash of the data file has not changed:

ckan -c /etc/ckan/default/ckan.ini datapusher_plus resubmit

To Resubmit a specific resource, whether or not the hash of the data file has changed:

ckan -c /etc/ckan/default/ckan.ini datapusher_plus submit {dataset_id}

Prefect orchestration (v3.0+)

DataPusher+ v3.0 replaces the v2 RQ-based background worker with a Prefect 3 flow. RQ is no longer used by DP+ itself (CKAN continues to ship RQ for unrelated extensions).

Why Prefect

  • Customizable workflows — operators compose their own ingestion flows from DP+'s @task primitives without forking the code.
  • Graceful failure & recoverable ingestion — per-task retries with exponential backoff, transactional rollback of partial datastore writes, result persistence so the Prefect UI's "Re-run from failed task" replays only the failed and downstream tasks, content-based caching that skips re-downloading unchanged resources.
  • Observability — every flow run, every task, every retry, every log line is visible in the Prefect UI. Each successful run attaches a Markdown "Data Quality Report" artifact and emits a datapusher.resource.ingested event that operators can wire into Automations (Slack, PagerDuty, search reindex, DCAT refresh) without DP+ hard-coding a specific alerting backend.
  • Horizontal scaling — add Prefect workers on additional hosts to scale ingestion throughput. Tag-based concurrency limits cap concurrent Postgres COPY operations so a burst of submissions doesn't flatten the datastore.

Topology

+--------+      submit_flow_run      +----------------+      poll        +-----------------+
|  CKAN  | ------------------------> | Prefect server | <--------------- | Prefect worker  |
+--------+      datapusher_hook      |  (UI on 4200)  |     flow runs    | (runs the flow) |
    ^             POST callback      +----------------+                  +-----------------+
    |                                        |
    +----------------------------------------+
                       Postgres (shared)

The CKAN web request handler synchronously POSTs the run to the Prefect server and returns immediately with a flow_run_id. A separately-managed worker process executes the flow. Prefect 3's default Postgres-backed server is sufficient — no Redis required.

Quick start (dev / demo)

# 1. Bring up Prefect server + worker + Postgres
docker compose -f docker-compose.prefect.yaml up -d

# 2. Register the DataPusher+ deployment with the Prefect server
ckan -c /etc/ckan/default/ckan.ini datapusher_plus prefect-deploy

# 3. Submit a resource via the CKAN UI — you should now see the flow
#    run in the Prefect UI at http://localhost:4200

For production, run your own Prefect server (or use Prefect Cloud) and configure PREFECT_API_URL to point at it.

Worker lifecycle

The CKAN-side RQ worker (ckan jobs worker) is no longer used for DP+ jobs. Replace it with:

PREFECT_API_URL=http://prefect-server:4200/api \
CKAN_INI=/etc/ckan/default/ckan.ini \
prefect worker start --pool datapusher-plus

Scale horizontally by starting workers on additional hosts pointed at the same Prefect server and work pool. Workers must have the datapusher-plus Python package installed and CKAN configuration accessible (it's read at flow-run start for resource metadata and the qsv binary path).

Upgrading from v2.0

A one-shot CLI helper handles the cutover:

ckan -c /etc/ckan/default/ckan.ini datapusher_plus migrate-from-rq --resubmit

What it does:

  1. Drains any pending DP+ jobs from CKAN's RQ queue.
  2. Resets any pending task_status rows on the CKAN side so the UI no longer falsely shows in-flight ingestions.
  3. Verifies the configured Prefect server is reachable.
  4. With --resubmit, re-submits each affected resource_id through the new Prefect path.

After it finishes: stop your CKAN-side ckan jobs worker process (or leave it if other extensions use RQ — DP+ jobs no longer flow through it), then start prefect worker start -p datapusher-plus.

Historical Jobs/Logs rows from the RQ era remain queryable through datapusher_status and the CKAN UI without modification — the v2 database schema needs no migration.

Observability features

Each successful flow run attaches:

  • A Data Quality Markdown artifact — row count, inferred schema, PII findings, quarantine count.
  • A CKAN resource link artifact — one-click jump back to the resource page in CKAN.
  • A datapusher.resource.ingested custom event with payload {rows, file_hash, duration_seconds} — wire this into Prefect Automations for downstream side effects.

The validation and PII tasks emit additional events (datapusher.row.quarantined, datapusher.pii.detected).

Two example automations ship in examples/automations/:

# Alert on 3 consecutive failures for the same resource within an hour
prefect automation create -f examples/automations/alert-on-consecutive-failures.json

# Alert on Crashed state (worker killed, OOM, infrastructure error)
prefect automation create -f examples/automations/alert-on-crashed.json

Replace REPLACE_WITH_NOTIFICATION_BLOCK_ID in those files with the ID of a notification Block (Slack, PagerDuty, email) you've registered in the Prefect UI before applying.

Custom flows

Operators compose their own ingestion flow by importing DP+'s @task primitives and registering the flow via config:

# my_plugin/flows.py
from prefect import flow

from ckanext.datapusher_plus.jobs.prefect_flow import (
    download_task, format_convert_task, validate_task, analyze_task,
    database_task, indexing_task, formula_task, metadata_task,
)
from prefect.transactions import transaction


@flow(name="my-custom-datapusher")
def my_custom_flow(job_input):
    dl = download_task(job_input)
    cv = format_convert_task(dl)

    # ... insert your own redaction / enrichment / spatial tasks here ...
    vl = validate_task(cv)
    an = analyze_task(vl)
    with transaction():
        db = database_task(an)
        idx = indexing_task(db)
        fm = formula_task(idx)
        md = metadata_task(fm)

Then point DP+ at it in ckan.ini:

ckanext.datapusher_plus.prefect_flow = my_plugin.flows:my_custom_flow

…and re-run ckan datapusher_plus prefect-deploy. Submissions via the CKAN UI now invoke your flow. The existing IDataPusher.can_upload / after_upload plugin hooks continue to fire — custom flows are additive, not exclusive.

Configuration reference (new in v3.0)

Key Default Purpose
ckanext.datapusher_plus.prefect_deployment_name datapusher-plus/datapusher-plus Fully-qualified Prefect deployment name (<flow>/<deployment>).
ckanext.datapusher_plus.prefect_work_pool datapusher-plus Work-pool name workers subscribe to.
ckanext.datapusher_plus.prefect_flow (unset) module.path:flow_name entrypoint of a custom flow.
ckanext.datapusher_plus.prefect_ui_base (unset) Base URL of the Prefect UI (e.g. http://prefect-server:4200) — when set, datapusher_status returns a job_url that deep-links into the run page.
ckanext.datapusher_plus.flow_timeout 7200 Outer flow-run timeout in seconds. Replaces v2's ckan.datapusher.timeout.
ckanext.datapusher_plus.max_quarantine_pct 5.0 Maximum percentage of rows that may be quarantined before the flow fails.
ckanext.datapusher_plus.pii_review_threshold 0 When > 0 and PII screening detects this many sensitive fields, the flow suspends for human approval via the Prefect UI.
ckanext.datapusher_plus.result_storage_block local-file-system/datapusher-plus-results Prefect Block for task-result persistence. Swap to S3/GCS for multi-host worker pools.

Worker-process env-var fallbacks (read when CKAN config isn't loaded in the worker):

  • DATAPUSHER_PLUS_FLOW_TIMEOUT_SECONDS
  • DATAPUSHER_PLUS_DOWNLOAD_RETRIES
  • DATAPUSHER_PLUS_DATABASE_RETRIES
  • DATAPUSHER_PLUS_CACHE_TTL_HOURS (default 24)
  • DATAPUSHER_PLUS_MAX_PERSIST_FILE_MB (default 512; 0 disables the cap)
  • DATAPUSHER_PLUS_MAX_QUARANTINE_PCT
  • DATAPUSHER_PLUS_RESULT_STORAGE_BLOCK

Prefect Variables (optional, highest priority): the flow looks up these Variable names at run start and uses the value when set. Set / change them from the Prefect UI (Variables tab) or prefect variable set NAME VALUE; takes effect on the next flow run without a worker restart.

Variable Overrides
dpp_flow_timeout ckanext.datapusher_plus.flow_timeout / DATAPUSHER_PLUS_FLOW_TIMEOUT_SECONDS
dpp_download_retries ckanext.datapusher_plus.download_retries / DATAPUSHER_PLUS_DOWNLOAD_RETRIES

Resolution order: Prefect Variable -> env var -> ckan.ini -> built-in default. Variable lookup failures (Prefect server unreachable, name absent, value not int-parseable) silently fall through to the next priority — operators with no Prefect Variables set see no behaviour change.

Troubleshooting

Symptom Likely cause Fix
datapusher_submit returns False with a Prefect connection error in the CKAN log The Prefect server is unreachable from CKAN Check PREFECT_API_URL and that the Prefect server is healthy at <API>/health.
Flow run sits in Scheduled forever No worker is polling the configured work pool Start prefect worker start -p datapusher-plus on a host with the datapusher-plus package installed.
Flow run goes straight to Failed with "QSV binary not found" The worker process can't see the qsv binary Set ckanext.datapusher_plus.qsv_bin in the CKAN config the worker reads, or install qsv in the worker's PATH.
Re-run from a failed task re-downloads the file Result storage block isn't registered, so persisted results aren't being read Re-run ckan datapusher_plus prefect-deploy — it calls ensure_result_storage_block.
Pre-existing partial table after a failed run Database on_rollback couldn't drop it (e.g., Postgres unreachable at rollback time) Drop manually with datastore_delete; the next submit recreates from scratch.

License

This material is copyright (c) 2025, datHere, Open Knowledge Foundation and other contributors

It is open and licensed under the GNU Affero General Public License (AGPL) v3.0 whose full text may be found at:

http://www.fsf.org/licensing/licenses/agpl-3.0.html

Footnotes

  1. Why use qsv instead of a "proper" python data analysis library like pandas?

  2. It takes 0.16 seconds with an index to run qsv stats against the qsv whirlwind tour sample file on a Ryzen 4800H (8 physical/16 logical cores) with 32 gb memory and a 1 TB SSD. Without an index, it takes 1.3 seconds.

  3. Imagine you have a 1M row CSV, and the last row has an invalid value for a numeric column (e.g. "N/A" instead of a number). After spending hours pushing the data very slowly, legacy datapusher will abort on the last row and the ENTIRE job is invalid. Ok, that's bad, but what makes it worse is that the old table has been deleted already, and Datapusher doesn't tell you what caused the job to fail! YIKES!!!!

About

Push data into the CKAN Datastore fast & reliably while inferring, calculating & suggesting metadata using Jinja2 Formulas defined in your scheming metadata schema. It pushes real good!

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors