# Sections

Sprache

Teile dieser Dokumentation werden automatisch aus dem Sourcecode generiert und sind daher nur in englischer Sprache verfügbar.

Eine ETL Pipeline ist eine Verkettung mehrerer sog. "Sections". Dabei werden die einzelnen verarbeiteten Items von einer Section zur nächsten Section weitergegeben. Jede Section hat dabei eine Aufgabe und verarbeitet und verändert einzelne Items, bevor sie an die nächste Section weitergegeben werden. Dabei kann eine Section aus einem einzelnen Items mehrere machen aber auch Items ganz weglassen.

Die ETL Pipeline besteht aus drei Arten von Arbeiten:

  • "E" für "Extract": Daten werden von einer Quelle extrahiert und in die Pipeline gefüttert
  • "T" für "Transform": Daten werden nach gewissen Regeln vollautomatisch transformiert
  • "L" für "Load": Daten werden in den digitalen Lesesaal geladen / gespeichert resp. publiziert.

Um in einer Pipeline eine Section einzusetzen, wird ein JSON-Objekt definiert, das mindestens ein Key section hat, sowie weitere optionale Angaben.

  • section: Technischer Name der Section (wie z.B. AttributeMapper)
  • title (optional): Titel, der anstelle der section angezeigt wird und besser erklärt, was in diesem Schritt gemacht wird.
  • comment (optional): Beschreibung fürs Verständnis und die Dokumentation der Pipeline.
  • options (optional): Konfigurationsoptionen für die Section.

Beispiel:

{
  "section": "Map",
  "title": "Entstehungszeitraum",
  "comment": "Der Entstehungszeitraum wird anhand des Feldes 'Datum' generiert...",
  "options": []
}

# AttributeMapper

Expects an option "mapping" as a list of dicts. Each dict represents an attribute mapping and has to contain the keys "source" (the key of the source attribute) and "target" (the new key of the attribute).

Only attributes listed in the "mapping" option are kept, all others are discarded.

Example:

{
  "section": "AttributeMapper",
  "options": {
    "mapping": [
      {
        "source": "$TITLE",
        "target": "title"
      },
      {
        "source": "$ID",
        "target": "legacy_id"
      },
      {
        "source": "$data.signature",
        "target": "signature"
      },
      {
        "source": "$data.history",
        "target": "history",
        "default": ""
      },
      {
        "source": "$data.format",
        "target": "format",
        "omit_empty": true
      }
    ]
  }
}

When source does not exist on an item and no default is defined, the pipeline is aborted with an error. If a default value is defined, the target will be set to the default value in case the source does not exist on the item.

When omit_empty is set to true, the target key is removed from the item when the source key does not exist or the value is one of "", null, [], (), {}.

In order to create custom fields on the fly, use the custom flag in the mapping item. It's best to also define labels in the required languages, so that the item is properly displayed later on the website.

When defining custom fields, it is possible to set a protected flag (see the example below for reference). The protected flag has the effect that this particular custom field is not published to the website (Solr) and is only stored in the internal systems. A use case for this feature is to be able to display protected data, such as person identifiy information (PII) to authorized personell in the orders management interface whilst publishing the record as a whole to the public without this sepcific metadata field and its contents which may deserve protection. Custom fields are not protected by default.

Example:

{
  "section": "AttributeMapper",
  "options": {
    "mapping": [
      {
        "custom": true,
        "source": "$GESCHICHTE",
        "label_de": "Geschichte",
        "label_fr": "Histoire",
        "label_en": "History",
        "display_group": 2,
        "slug": "history"
      },
      {
        "custom": true,
        "protected": true,
        "source": "$SCHUTZFRISTNOTIZ",
        "slug": "schutzfristnotiz",
        "label_de": "Schutzfristnotiz"
      }
    ]
  }
}

# Options

  • cleanup - Cleanup fields

    • When true, fields that are not listed in the attribute mapping are removed from the item.
    • Type: Boolean
    • Default: true
  • mapping - Field mapping

    • The field mapping is a list of dicts, each mapping one field value.
    • Type: List (AttributeMapping)
      • custom - Custom field

        • When true, the value is prepared to be imported as custom field. When used, target must not be set but slug instead.
        • Type: Boolean
        • Default: false
      • default - Default value

        • Default value which is set when source is not set in the item.
        • Type: JSON
      • display_group - ISAD(G) group number

        • Can be used in combination with custom=true in order to define in which ISAD(G) display group the custom value will be displayed in the default metadata view.
        • Type: Choice
        • Choices:
          • 1: Identity Statement ("Identifikation")
          • 2: Context ("Kontext")
          • 3: Content and Structure ("Inhalt und innere Ordnung")
          • 4: Conditions of Access and Use ("Zugangs- und Benutzungsbestimmungen")
          • 5: Allied Materials ("Sachverwandte Unterlagen")
      • label_de - German label

        • Type: Char
      • label_en - English label

        • Type: Char
      • label_fr - French label

        • Type: Char
      • omit_empty - Omit empty

        • When the value is missing in item and omit=true, the value is removed completely from the result.
        • Type: Boolean
      • protected - Protected custom value

        • In combination with custom=true, setting protected=true makes custom field values to be excluded in the data sent to the website / external systems.
        • Type: Boolean
      • slug - Customfield Slug

        • The slug is the identifier of the custom field. It can be used optionally in combination with custom=true in order to make the field identifiable in other configs such as for the Web.
        • Type: Char
      • source - Source fieldname or expression

        • The source of the value within the item can be configured with a simple field name (e.g. title) or with an expression (e.g. $title or $metadata.title).
        • Type: Char
      • target - Target fieldname

        • The target must be configured when mapping to a product field or when the attribute mapper is used for renaming / filtering data. When the target is used, it simply sets the value with this fieldname.
        • Type: Char
      • value - Value to map

        • The value option can be used instead of the source option for the ability to use expressions when mapping attributes.
        • Type: Expression

# Compress

The compress section allows for compressing a single file with gzip.

In order for this section to work, a previous section (such as JSONWriter) must have written a file to the temp directory of the ETL worker and yield the path as an item in the pipeline.

The Compress section then compresses the file on the disk (adding the suffix .gz) and replacing the path in the item which is yielded further. The original file is not kept.

Example:

{
  "section": "Compress",
  "options": {
    "path_field": "path",
    "level": 7
  }
},

Input:

{"path": "/tmp/etl/json_writer/data.json"}

Output:

{"path": "/tmp/etl/json_writer/data.json.gz"}

The compressed file is automatically removed by the section after it is processed by the later sections.

# Options

  • level - Compression level

    • The compression level for the gzip tool as a number from 1 (fast) to 9 (best).
    • Type: Integer
    • Default: null
  • path_field - Path field

    • Name of the field in the consumed item containing the path to the file on the disk.
    • Type: Char
    • Default: 'path'
  • result_path_field - Resulting path field

    • Name of the field where the resulting path of the compressed file is written to. By default (null) this matches the path_field so that the path is overwritten.
    • Type: Char
    • Default: null

# CreateContainerRecordRelations

Creates or updates (based on their "identifier") container record relations for items on the pipeline.

Example:

{"section": "CreateContainerRecordRelations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateContainers

Creates or updates (based on their "identifier") containers for items on the pipeline.

Example:

{"section": "CreateContainers"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateDeliveries

Creates or updates (based on their "identifier") deliveries for items on the pipeline.

{"section": "CreateDeliveries"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateDeliveryRecordRelations

Creates or updates (based on their "identifier") delivery record relations for items on the pipeline.

{"section": "CreateDeliveryRecordRelations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateDescriptorRecordRelations

Creates or updates (based on their "identifier") descriptor record relations for items on the pipeline.

{"section": "CreateDescriptorRecordRelations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateDescriptorRelations

Creates or updates (based on their "identifier") descriptor relations for items on the pipeline.

{"section": "CreateDescriptorRelations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateDescriptors

Creates or updates (based on their "identifier") descriptors for items on the pipeline.

{"section": "CreateDescriptors"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateEnumEntries

Creates or updates enum entry objects. For updating, the unique combination of "enum" and "identifier" is used.

Configuration example:

{"section": "CreateEnumEntries"}

Accepts items in this form:

{
    "enum": "archival_type",
    "identifier": "pic",
    "order": 1,
    "label_de": "Bild",
    "label_en": "Picture",
    "label_fr": "Image",
    "label_it": null,
    "data": {"important": true},
},

Keys:

  • enum (required) contains the identifier of the enum, which usually matches the field name.
  • identifier (required) contains the identifier of the entry in the enum, which must be unique per enum.
  • order (optional) integer value for ordering.
  • label_* (optional) labels translated in each supported and enabled language, such as de, en, fr, it, rm.
  • data: (optional) JSON field for storing additional structured data in this record.

prune_enum Option: The prune_enum option makes it possible to delete and replace a whole set of enum entries with the same enum value without changing other existing enums.

This is done by removing all existing entries for an enum on the first appearance of an item in the pipeline with this enum name. Enum entries which have an enum name that does not appear in the pipeline are kept.

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false
  • prune_enum - Prune enum

    • Delete all enum entries of newly imported enums.
    • Type: Boolean
    • Default: false

# CreateIIIFSources

Creates or updates (based on their "id") file objects for items on the pipeline. Prunes all existing relations before re-creating them.

ℹ️ The name CreateIIIFSources has historic reasons. Nowadays, the section is used for all kinds of files.

{"section": "CreateIIIFSources"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateLocations

Creates or updates (based on their "identifier") locations for items on the pipeline.

Example:

{"section": "CreateLocations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreatePackages

Creates or updates (based on their "identifier") packages for items on the pipeline.

Example:

{"section": "CreatePackages"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateRecordRelations

Creates or updates (based on their "identifier") record relations for items on the pipeline.

Example:

{"section": "CreateRecordRelations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateRecords

Creates or updates (based on their "identifier") records for items on the pipeline.

{"section": "CreateRecords"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CsvFileReader

Accepts a list of strings denoting the absolute paths on the file system. Reads each CSV file in that list and returns a dict for each row.

Example:

{"section": "CsvFileReader",
 "options": {"columns": ["ID_NR"]}}

# Options

  • columns - Columns
    • A list of column names to add to the pipeline. If empty, all columns are added.
    • Type: List
    • Default: []

# Deduplicate

Deduplicate any items by the value of a specific field.

In some cases, we need to deduplicate items when they appear twice, identified by a specific fieldname. This section does exactly that.

Example:

{
  "section": "Deduplicate",
  "options": {
    "field": "identifier"
  }
}

# Options

  • field - Field name required
    • Name of the field of the unique value which is used for deduplicating items that have the very same value in this field.
    • Type: Char

# DeleteItems

Takes a list of objects expecting a key named identifier_field. All entries from the model model are deleted matching the list of identifiers. Yields the initial list again.

# Options

  • model - Model name required

    • Name of the model to delete from, all in lowercase characters.
    • Type: Choice
    • Choices:
      • record: Datensatz
      • recordrelation: Verknüpfung von Verzeichnungseinheiten
      • file: Files
      • descriptor: Deskriptor
      • descriptorrelation: Verknüpfung von Deskriptoren
      • descriptorrecordrelation: Verknüpfung von Deskriptoren und Verzeichnungseinheiten
      • delivery: Ablieferung
      • deliveryrecordrelation: Verknüpfung von Ablieferungen und Verzeichnungseinheiten
      • container: Behältnis
      • containerrecordrelation: Verknüpfung von Behältnissen und Verzeichnungseinheiten
      • enumentry: Enum Wert
      • package: Package
      • location: Standort
  • batch_size - Batch size

    • Number of items to delete in one batch.
    • Type: Integer
    • Default: 100
  • identifier_field - Identifier Feldname

    • Name of the field on the item in the pipeline, containing the value to look for in lookup_field.
    • Type: Char
    • Default: 'identifier'
  • lookup_field - Model field name

    • Name of the field on the model used to look up the values from identifier_field.
    • Type: Char
    • Default: 'identifier'

# ExcelReader

Accepts a list of strings denoting the absolute paths on the file system. Reads each Excel file in that list and returns a dict for each row.

Example:

{"section": "ExcelReader",
 "options": {"columns": ["ID_NR"]}}

Renaming columns: In the ETL pipeline, the keys should be technical. Characters such as . or , can break certain section in the pipeline. In such cases, the fields can be simply renamed.

{
  "section": "ExcelReader",
  "options": {
    "columns": ["Inventarnr. verwandte Objekte"],
    "rename_columns": {"Inventarnr. verwandte Objekte": "inventarnr_verwandte_objekte"}
  }
}

# Options

  • columns - Column names

    • A list of column names to preserve. If empty, all columns are preserved.
    • Type: List
    • Default: []
  • rename_columns - Rename columns

    • Mapping of columns to rename, where the key is the string name of the excel column and the value is the new name it should have.
    • Type: Dict
    • Default: {}
  • sheet - Sheet name

    • The name of the excel sheet to read from. When not set, the first sheet is read.
    • Type: Char
    • Default: null

# Expand

The Expand section is used for repeating an item.

With this section, in item can be repeated for each value of a list in a property of the items.

Example Pipeline definition:

{
  "section": "Expand",
  "options": {
    "fields": ["relation_target"]
  }
}

Input:

[
  {"relation_source": 1, "relation_target": [2, 3, 4]}
]

Output:

[
  {"relation_source": 1, "relation_target": 2},
  {"relation_source": 1, "relation_target": 3},
  {"relation_source": 1, "relation_target": 4}
]

When one of the fields is missing or the value is empty or falsy, the item will not be passed.

# Options

  • field - Field name

    • Name of the field that contains a list by which the items should be expanded. field and fields are mutual exclusive.
    • Type: Char
    • Default: null
  • fields - Field names

    • List of fieldnames to expand at the same time. Each field must have the same amount of values. field and fields are mutual exclusive.
    • Type: List
    • Default: null
  • split - Split by character

    • Split the string value of the fields by this character.
    • Type: Char
    • Default: null
  • strip - Strip characters

    • Strips the characters defined in this field in all the values of the expanded lists.
    • Type: Char
    • Default: null
  • target - Target field name

    • Name of the field that the expanded field value should be stored in. When not defined the field is replaced inplace and the original value is dropped. target can not be combined with fields.
    • Type: Char
    • Default: null

# ExtractZip

The ExtractZip section allows to extract Zip files that were placed in the pipeline by a previous section such as FileSource.

It expects an item with a path to a ZIP file and yields an item with a path for each file in the ZIP after extraction.

Example item:

{
    "path": "/tmp/etl/zip/foo.txt"
}

# Options

# FileDeduplicator

The file deduplicator section has the job to reduce the amount of processed files by skipping duplicated images.

The use case is that when one specific record has multiple files, containing the original (which we assume to be a TIF) and lower quality preview copies (which we assume to be JPG), we only want to keep the TIF.

Example:

{
  "section": "FileDeduplicator",
  "options": {
    "filename_field": "DATEINAME",
    "record_identifier_field": "ID_NR_VRZNG_ENHT",
  }
}

# Options

  • filename_field - Filename field name

    • The name of the field containing filenames or paths. The value of the field is used for identifying duplicates based on the filename stem and for identifying the extension for selecting the best version of the image.
    • Type: Char
    • Default: 'DATEINAME'
  • record_identifier_field - Record identifier field name

    • The name of the field containing the identifier of the record.
    • Type: Char
    • Default: 'ID_NR_VRZNG_ENHT'

# FileDownloader

The FileDownloader section has the purpose to download files from a source system and store them on a temporary volume for further processing by other sections and services.

The section works in combination with a file-downloader service, which runs in a separate. The source system must be configured properly in the file downloader section.

Source types:

  • HTTP / HTTPS (http): The file downloader service can download files from HTTP server.
  • S3 (s3): Download the files from an S3 service, supporting S3 authentication configured in the service.
  • Fedora Repository (fedora): Downloads an AIP package (jar-file) from a Fedora Repository service, supporting authentication configured in the service. The AIP archive is then ZIP-extracted and the files are processed later by the publisher section

Path/URL replacing:

In order to create an absolute path which works on the source system based on an URL or filesystem path, we often have to replace parts of the string. The section has a built-in functionality for doing that based on regular expressions.

The example below shows a configuration for this conversion:

  • input: file:///Q:/2018-00_P-S&N/STA_H_43_54/STA_H_43_54.xml
  • output: /2018-00_P-S&N/STA_H_43_54/STA_H_43_54.xml

Example:

{
  "section": "FileDownloader",
  "options": {
    "identifier_field": "ID_NR_DATEI",
    "source_path_field": "URL",
    "source_path_regexp_replace": [
      "file:///[A-Z]:(.*)",
      "\1"
    ]
  }
}

This section's output is meant to be consumed by a FilePublisher section later. The section especially yields the attributes public_path and temp_files.

Giving URLs as input:

The section can also be used to download files from a HTTP server. In this case, the source_type option must be set to http. The source_path_field have to point to a field containing the URL in the previous section's output.

The config_name can be omitted in this case.

The file might not have an identifier since it is not coming from database. To generate a unique identifier use Map section and generate_identifier method.

Example:

{
  "section": "FileDownloader",
  "options": {
    "source_type": "http",
    "identifier_field": "identifier",
    "source_path_field": "URL"
  }
}

# Options

  • source_path_field - Source path field name required

    • Name of the field containing a Path or URL with the path and name of the file on the source system. The path extracted from here is expected to be usable as path in the URL on the source system.
    • Type: Char
  • config_name - File downloader source config name

    • Name of the preconfigured file downloader source config which was preconfigured when installing the system. The available config names are different from installation to installation.
    • Type: Expression
    • Default: null
  • filename - Filename

    • When used, this filename is set after downloading the file. This is helpful when the URL does not contain a proper filename, which is necessary in order to make extension based file detection.
    • Type: Expression
    • Default: null
  • identifier_field - Identifier field name

    • Name of the field containing the unique identifier of the file on the source system.
    • Type: Char
    • Default: 'ID_NR_DATEI'
  • purge_on_start - Purge downloaded files on start

    • When enabled, the downloads directory is cleared on start, so that potential leftover from a previous, failed run are removed. When using multiple FileDownloader sections in a pipeline, this should maybe be disabled.
    • Type: Boolean
    • Default: true
  • source_path_regexp_replace - Source path regexp replace list

    • A search/replace regexp pattern, represented as a list of two strings, the first one containing a search pattern, the second a replace pattern.
    • Type: List
  • source_type - Source type

    • The type of the source system.
    • Type: Choice
    • Choices:
      • s3: S3 (Default)
      • http: HTTP / HTTPS
      • fedora: Fedora Repository

# FilePublisher

The file publisher section has the goal to publish files to the public S3 bucket, so that it can be displayed on the web.

The pipeline must be configured in a way, so that only files of records are fed to this section, when the records are public (is_public) and their files can also be public (files_are_public).

In order for the file publisher section to work properly, a set of publisher services in separate containers are required. Depending on the file type, these sections have ressource intensive tasks to do, such as preparing the images for a IIIF viewer.

This section is expected to be used in combination with the FileDownloader section, which adds items to the pipeline containing paths to the files on the shared temporary volume. It especially requires the attributes public_path and temp_files on the items.

Pipeline version:

File publishing is in general a slower task compared to only processing metadata. Therefore, it is important to have a mechanism for identifying situations where the work is already done and does not need to be redone.

This is implemented with a "pipeline version" feature. As a pipeline version value we can define an arbitrary version number of the current configuration of the pipeline and the worker services. When we rerun the same pipeline, files will be skipped that were already processed with the currently configured pipeline version. In this case, the section aborts the file processing and removes the file from the pipeline, since it is already processed.

Whitelist and Blacklist:

In this section, you can filter files using a whitelist and blacklist configuration which are lists of file extensions. If the whitelist is enabled, only files with the specified extensions will be processed. If the blacklist is enabled, files with the specified extensions will not be processed. If both are enabled, the blacklist will take priority over the whitelist.

For example, if you want to process only pdf files, you can set the whitelist to

{
  "section": "FilePublisher",
  "options": {
    "whitelist": ["pdf", "pdfa"]
  }
}

The file extensions are always compared in lowercase.

Example:

{
  "section": "FilePublisher",
  "options": {
    "pipeline_version": "17.2"
  }
}

# Publisher arguments

The publisher arguments are passed to the publisher services. The arguments are grouped by the media type of the files. The available arguments are:

  • image: as_thumbnail (default: false): If set to true, the image will not be processed by the image publisher service. Only the thumbnail will be created and uploaded.

Example:

{
  "section": "FilePublisher",
  "options": {
    "publisher_args": {
      "image": {
        "as_thumbnail": true
      }
    }
  }
}

The publisher arg values support expressions, which have access to item and file:

{
  "section": "FilePublisher",
  "options": {
    "publisher_args": {
      "image": {
        "as_thumbnail": "{{ item.thumbnail_only }}"
      }
    }
  }
}

# Options

  • blacklist - Blacklist

    • If set, file extensions that are included in the list are not processed.
    • Type: List
  • package_processing - Package processing

    • If enabled, the section will process packages (e.g. AIPs). If disabled, package files are processed individually.
    • Type: Boolean
    • Default: false
  • pipeline_version - Pipeline version

    • The version of the pipeline configuration. Files that were already processed with this specific version are skipped in future runs.
    • Type: Char
  • publisher_args - Publisher args

    • Additional arguments that are passed to the publisher services. The key is the media_type (e.g. image), the value is a dict where the key is the argument name and the value is either a raw value or an expression, which has access to item and file.
    • Type: Dict
    • Default: {}
  • whitelist - Whitelist

    • If set, only file extensions included in the list are processed.
    • Type: List

# FileSource

The FileSource section allows to manually upload a file for processing.

The file could be an excel file that is read and processed by the ExcelReader section. The section is not meant to be used for automatic / nightly running, since the user has to manually upload a file when the pipeline is executed.

The section yields items with paths:

[
    {
        "path": "/path/to/uploaded-file.xlsx"
    }
]

# Options

# Filter

Filter items by property values.

The filter section allows to drop items from the pipeline based on basic value comparison. Simple comparison operators can be used as well as and, or and not.

All expressions are represented as a simple list.

Keep items with a truthy field value:

{"keep": ["$is_public"]}

Using comparison operators:

{"keep": ["$is_public", "==", True]}
{"keep": ["$is_public", "!=", False]}
{"keep": ["$size", ">=", 10]}
{"keep": ["$title", "is not", None]}

Using and / or expressions:

{"keep": ["or",
          ["$schutzfrist_in_vergangenheit"],
          ["and"
            ["$status", "==", "Abgeschlossen"],
            ["$auf_dem_portal_sichtbar", "==", "Schutzfrist ignorierend"]]]}

Using not:

{"keep": ["and",
          ["$schutzfrist_in_vergangenheit"],
          ["not", ["$status", "==", "Abgeschlossen"]]]

# Options

  • keep - Keep conditions required
    • Contains a condition definition. All items for which the condition has a truthy result are kept in the pipeline. Items that do not meet the condition are dropped from the pipeline.
    • Type: JSON

# HTTP

The HTTP source section enables you to perform an HTTP request to any API endpoint and furnish the payload as a transient file for subsequent processing in a subsequent section.

Illustrative Example:

{
    "section": "HTTP",
    "options": {
        "url": "https://example.com/api/items",
        "params": {
            "query": "Test"
        }
    }
}

This action will introduce an item into the pipeline in the subsequent format:

{
    "path": "/tmp/etl/http/response-body-f34ds7fjlk",
    "status_code": "200",
    "reason": "OK",
    "headers": {
        "Content-Type": "application/json"
    }
}

Following this, the next section can access the file from the path and conduct suitable processing.

# Custom Request Headers

Custom request headers can be configured using the headers option:

{
    "section": "HTTP",
    "options": {
        "url": "https://example.com/api/items",
        "headers": {
            "Accept": "application/json"
        }
    }
}

# Basic Authentication

In situations where certain APIs mandate basic authentication, this can be accomplished by setting up a list of a username and password within the auth option:

{
    "section": "HTTP",
    "options": {
        "url": "https://example.com/api/items",
        "auth": ["api-user", "secret"]
    }
}

# OAuth

The section supports the OAuth Backend Application flow to obtain an access token that is used to access an API. This can be configured using the oauth option:

{
    "section": "HTTP",
    "options": {
        "url": "https://example.com/api/items",
        "oauth": {
            "client_id": "api-client-id",
            "client_secret": "api-client-secret",
            "token_url": "https://example.com/api/accesstoken"
        }
    }
}

# foreach

The section supports operating on previous items in the pipeline using the foreach option. It will call url for each item yielded by the previous section and stores the result of the request in target_path of the item.

Expressions can be used to build the url based on attributes of the item.

# Handling Response Codes

The section will trigger an exception when the response code falls outside the successful range, specifically when it is not a status code in the "2xx" range. In cases where the server responds with a server error (beginning with "5xx") or a client error (beginning with "4xx"), the section will terminate with an exception.

# Pagination Capability

The HTTP section offers support for paginating the requested API endpoint, provided that the response body is compatible. To enable pagination, the responding API must specify the content-type header as one of the supported content types (detailed below).

For each subsequent request, this section will generate and insert an item with a payload file path. It's important to note that this approach impacts the total number of requests in the pipeline run stats. This is because, in this scenario, it's impossible to predict the exact number of requests that will be made during pagination.

# Supported Content Types

In order to obtain the necessary information for pagination from the response body, the response body must be parsed and compatible with the supported content types. All pagination variants are compatible with these content types.

# JSON documents

When working with JSON payloads, you can select specific values using the JSON Pointer syntax. For more details, please refer to the JSON Pointer documentation (opens new window).

Content types:

  • application/json
  • application/ld+json
# XML documents

For XML documents, values can be extracted using XPath (opens new window).

Content types:

  • application/xml
  • text/xml

# Paginators

# Cursor-Based Paginator

The cursor_pagination method operates by expecting a cursor value in the response, which serves as a reference to the next page. In this method, the cursor should accurately represent the next page to be fetched.

Here's an example payload structure:

{
    "data": {
        "programmePage": {
        "cursor": "MSBnZWdlbiAxMDAgLSBTb21tZXJzcGVjaWFs",
        "edges": [
            {},
            {},
        ]
    }
}

To configure the paginator for this method, you can use the following settings:

{
    "section": "HTTP",
    "options": {
        "url": "http://api.server.com/items",
        "cursor_pagination": {
            "cursor_path": "/data/programmePage/cursor",
            "cursor_param": "cursor"
        }
    }
}

By specifying these configurations, the section will effectively manage pagination using the cursor values found in the response, ensuring the retrieval of subsequent pages of data.

# Next URL-Based Paginator

The next_pagination method is based on extracting a fully qualified URL from the response document to determine the URL for the next page. For instance, if the requested API provides a payload with a "next" URL like this:

{
    "pagination": {
        "next": "http://api.server.com/items?page=2"
    },
    "results": [{"item": 1}]
}

You can extract the next URL using a JSON Pointer expression, as demonstrated in the following configuration:

{
    "section": "HTTP",
    "options": {
        "url": "http://api.server.com/items",
        "next_pagination": {
            "next_url_path": "/pagination/next"
        }
    }
}

This approach allows the section to dynamically determine and follow the next page URL provided in the response, facilitating effective pagination.

# Offset-Based Paginator

The offset_pagination method facilitates pagination based on a specified starting index, referred to as the offset. In a scenario where a page contains, for instance, 50 items, the initial request would use a start parameter set to 0, and subsequent requests would increment the start value to 50, or alternatively, based on the API's indexing system, potentially 51.

To make this approach work effectively, the requested API must provide the following values in the response:

  • offset: Denoting the index of the first element in the current page.
  • limit: Indicating the maximum number of items on the current page.
  • total: Representing the total number of items across all pages.

The section performing the pagination will cease when any of these values is missing, or when the calculated offset for the next page exceeds the total items available.

Here is an example of a payload structure:

<root start="51" page_size="50" total="237">
    <items>
        ...
    </items>
</root>

To configure the paginator for this method, you can use the following settings:

{
    "section": "HTTP",
    "options": {
        "url": "http://api.server.com/items",
        "offset_pagination": {
            "offset_path": "/root/@start",
            "offset_param": "start",
            "limit_path": "/root/@page_size",
            "limit_param": "size",
            "total_path": "/root/@total"
        }
    }
}

This configuration allows the section to manage pagination effectively based on the offset values provided in the response, ensuring the retrieval of all relevant data.

# Page-Based Paginator

The page_pagination method operates by simply incrementing a page number in the request parameter, assuming that the first page is numbered as 1.

To determine when to stop, the configuration should include the items_path, which enables the paginator to identify whether items have been returned in the response or not.

Here's an example of a payload structure:

{
    "data": [
        {},
        {}
    ]
}

To configure the paginator for this method, you can use the following settings:

{
    "section": "HTTP",
    "options": {
        "url": "http://api.server.com/items",
        "page_pagination": {
            "page_param": "page",
            "items_path": "/data"
        }
    }
}

With these configurations, the section will effectively manage pagination by incrementing the page number and checking for the presence of items in the response, ensuring the retrieval of subsequent pages of data.

# Options

  • url - URL required

    • The URL which should be requested.
    • Type: Expression
  • auth - Authentication

    • List of username and password in plaintext, submitted as basic authentication. Environment variables can be referenced using $env. Example: $env.EXAMPLE_SECRET uses the value of the environment variable ETL_ENV_EXAMPLE_SECRET.
    • Type: List
    • Default: null
  • cleanup - Cleanup files

    • When enabled, the temporary payload file is automatically removed after passing on the item in the pipeline.
    • Type: Boolean
    • Default: true
  • cookies - Cookies

    • Dict of cookies to send with the request.
    • Type: Dict
    • Default: null
  • cursor_pagination - Cursor pagination

    • Type: Dict
    • Default: null
      • cursor_param - cursor query param name required

        • The query param name to use for the cursor value in the next request.
        • Type: Char
      • cursor_path - Path to the cursor value required

        • The cursor value must contain the cursor for the current page.
        • Type: Char
  • foreach - Foreach

    • Type: Dict
      • target_path - Path to store the result required
        • Path to the key on the item in the pipeline where the result will be stored. "" can be used to replace the entire item.
        • Type: Char
  • headers - Headers

    • Request headers to be sent.
    • Type: Dict
    • Default: null
  • json - JSON Payload

    • A JSON serializable Python object to send in the body of the Request.
    • Type: JSON
    • Default: null
  • method - HTTP Method

    • The HTTP method with which the requests should be made.
    • Type: Choice
    • Default: 'GET'
    • Choices:
      • GET: GET
  • next_pagination - Next pagination

    • Type: Dict
    • Default: null
      • next_url_path - Next URL path required
        • Path to the URL of the next page in the response document.
        • Type: Char
  • oauth - Oauth

    • Type: Dict
      • client_id - Client ID required

        • Environment variables can be referenced using $env. Example: $env.EXAMPLE_SECRET uses the value of the environment variable ETL_ENV_EXAMPLE_SECRET.
        • Type: EnvChar
      • client_secret - Client Secret required

        • Environment variables can be referenced using $env. Example: $env.EXAMPLE_SECRET uses the value of the environment variable ETL_ENV_EXAMPLE_SECRET.
        • Type: EnvChar
      • token_url - URL used to get an access token required

        • Type: Char
  • offset_pagination - Offset pagination

    • Type: Dict
    • Default: null
      • limit_path - Path to the limit value required

        • The limit value must contain the maximum amount of items on one page.
        • Type: Char
      • offset_param - Offset query param name required

        • The query param name to use for the offset value in the next request.
        • Type: Char
      • offset_path - Path to the offset value required

        • The offset value must contain the index of the first element of the current page.
        • Type: Char
      • total_path - Path to the total value required

        • The offset value must contain total amount of items in all pages.
        • Type: Char
      • limit_param - Limit query param name

        • The query param name with which the limit is sent in the next request. This is optional. When null, the query param is omitted.
        • Type: Char
        • Default: null
  • page_pagination - Page pagination

    • Type: Dict
    • Default: null
      • items_path - path to the items required

        • When there there are no items, the pagination is stopped.
        • Type: Char
      • page_param - page query param name required

        • The query param name to use for the page value in the next request.
        • Type: Char
  • params - Query Params

    • Request params to be sent in the query string.
    • Type: Dict
    • Default: null
  • timeout - Timeout

    • How many seconds to wait for the server to send data before giving up as a float.
    • Type: Float
    • Default: null
  • verify - Verify TLS certificate

    • Control whether we verify the server's TLS certificate.
    • Type: Boolean
    • Default: true

# InvokePipeline

The InvokePipeline will invoke an existsing pipeline by passing the pipeline_id in the options. Multiple InvokePipeline sections can be chained after each other to process the same items with different pipelines. In the following example, two pipelines are invoked with the output of the CsvFileReader.

Example with input mode:

[
    { "section": "CsvFileReader" },
    {
        "section": "InvokePipeline",
        "options": {
            "pipeline_id": 2,
        }
    },
    {
        "section": "InvokePipeline",
        "options": {
            "pipeline_id": 3,
        }
    },
]

# Options

  • pipeline_id - Pipeline id required
    • Type: PrimaryKeyRelated

# JSONReader

The JSONReader source section allows the read JSON files and yield the JSON items as a list.

This section expects the previous item to contain the path to a JSON file (by default under /path).

Example:

[
    {
        "path": "/path/to/file.json"
    }
]

Let's say we have a JSON file with the following structure:

{
    "groupSet": [
        {
            "offers": [
                {"name": "Offers 1"},
                {"name": "Offers 2"}
            ]
        },
        {
            "offers": [
                {"name": "Offers 3"},
                {"name": "Offers 4"}
            ]
        }
    ]
}

We can use the JSONReader section to read the offers items from the first element of the groupSet array:

{
    "section": "JSONReader",
    "options": {
        "items": "/groupSet/0/offers"
    }
}

JSON Pointer syntax can be used to specify the path to data in a JSON file, which is a string of tokens separated by /. For getting more information about the JSON Pointer syntax, please refer to the JSON Pointer documentation (opens new window).

Invalid data paths will result in an empty list being returned. It will always return a list, even if the data path points to a dictionary.

# Set statistics total

This section allows to configure the pipeline total. See the XMLReader documentation for details as it works similar for both sections.

# Options

  • items - Items

    • JSON Pointer to the list of items that will be passed on to the next section individually. When not defined, the whole document is passed on as one item.
    • Type: JSONPointer
    • Default: ''
  • path - Path

    • JSON Pointer adressing the path to the file to be loaded.
    • Type: JSONPointer
    • Default: JsonPointer('/path')
  • target - Target

    • JSON Pointer for the place where the result is inserted in the processed item. When not set, the processed item is replaced by the resulting item.
    • Type: JSONPointer
    • Default: null
  • total - Total count

    • When processing multiple items with data, such as HTTP responses, the pipeline statistics and progressbar keep being updated. When the first item/HTTP response contains information about the total amount of items to be processed, this information can be set using the total option. The total option accepts an expression, where the previous item / HTTP response can be accessed with item and the parsed content body can be accessed with document.
    • Type: Expression
    • Default: null

# JSONWriter

The JSONWriter section consumes items and writes them to a JSON-file as a list of items. The path to the file is yielded to the next section which may process it further.

In detail, the section writes all items consumed from the prior section into files on the temp file system of the ETL worker. It does not yield the consumed items to the next section but it yields only one item per written file, which contains the path to the file on the disk.

The written files are automatically removed after they are processed by the next section in order to keep disk usage low.

The items are written in batches (configurable through batch_size). The filename is also configurable and acts as a template, where these values are replaced:

  • $batch - the batch number, starting with 1.
  • $date - the date when the pipeline was executed in the form YYYMMMDD.
  • $time - the date and time in form YYYYMMDD-hhmmss; the date and time is the same for all written batch files for convenience, thus it does not resemble the time the file was written but the time the pipeline run was started.

Example configuration:

{
  "section": "JSONWriter",
  "options": {
    "batch_size": 200,
    "filename": "data-$date-$batch.json"
  }
},

Example output with two batches:

[
{"path": "/tmp/etl/json_writer/data-20221229-1.json", "size": 200},
{"path": "/tmp/etl/json_writer/data-20221229-2.json", "size": 173},
]

It is also possible to use simple item properties in the filename template. This has the effect that the items are grouped into multiple JSON output files.

Example configuration:

{
  "section": "JSONWriter",
  "options": {
    "batch_size": 200,
    "filename": "data-${archival_type}-${batch}.json"
  }
},

Only simple item properties such as strings, integers and booleans can be used in the filenames. Nested data structures such as lists and dicts are not supported.

WARNING

Be aware that when batch_size set to a non-zero value and $batch is missing in the filename template, the section will yield the same file path / filename multiple times with multiple batches of items. Depending on what is done with the files, the file may be overwritten on the target system and may result in missing data.

# Options

  • batch_size - Batch size

    • Amount of objects to be written in one file. Set it to 0 in order to write all items into one file.
    • Type: Integer
    • Default: 1000
  • filename - Filename

    • Filename template of the JSON file.
    • Type: Char
    • Default: 'data-$batch.json'
  • pretty - Pretty print

    • Pretty print items on multiple lines.
    • Type: Boolean
    • Default: false

# Logger

The logger section is used for logging data to the stdout of the worker process. This is useful for inspecting the behavior of a pipeline. The logger-section re-yields the all consumed items to the next section.

Example:

{
  "section": "Logger",
  "options": {
    "prefix": "Published",
    "fields": [
      "ID_NR_DATEI",
      "public_path"
    ],
    "oneline": true
  }
}

# Options

  • fields - List of field names

    • A list of fields to be logged for each item. This helps to reduce the amount of output for large items.
    • Type: List
    • Default: null
  • limit - Limit

    • Only log the first n items consumed in order to not fill the log.
    • Type: Integer
    • Default: 100
  • oneline - One line

    • Flag for disabling pretty-printing items on multiple lines. Especially useful in combination with fields in order to make the output more compact.
    • Type: Boolean
    • Default: false
  • prefix - Prefix

    • Prefix for each log entry in order to identify which logger has produced the output. This is helpful when combining multiple loggers in the same pipeline.
    • Type: Char
    • Default: 'Logger'

# Map

The map section applies functions defined as operations on all items in the pipeline and acts as the "transform" part of the ETL pipeline.

The map section accepts a list of operations, where each operation consists of a dict with these keys:

  • function: string with the name of the function to apply
  • args: list of arguments for the function; string values with a $ prefix are replaced by the item-value identified by the name following the $. Nested values inside a dict can be accessed with dot notation. e.g.: $field.nested_key.double_nested_key. This notation also supports selecting list elements, for instance with $list.0 or $list.-1.
  • kwargs: optional list of keyword arguments for the function.
  • target: the name of the variable, in which the output is stored within the item.

There are many functions that can be used within the Map section. One special function is the condition function, which makes it possible to perform a condition on the item and store the boolean result in the item.

Examples:

In the next example, the string function split is applied on the value of the field ARCHIVALIENART in order to split the string by newline into a list of strings.

{
  "section": "Map",
  "options": {
    "operations": [
      {
        "args": [
          "$ARCHIVALIENART",
          "\r\n\r\n"
        ],
        "target": "archival_type",
        "function": "split"
      }
    ]
  }
}

In the next example, we are creating a datetime string and store it in the field _today on the item. The string can later be used for comparing dates.

{
  "section": "Map",
  "options": {
    "operations": [
      {
        "target": "_today",
        "function": "now"
      },
      {
        "args": [
          "$_today",
          "%Y-%m-%d"
        ],
        "target": "_today",
        "function": "strftime"
      }
    ]
  }
}

In the next example, the Map section is used to compare the datestring SCHUTZFRISTENDE to the current datestring stored in _today.

{
  "section": "Map",
  "options": {
    "operations": [
      {
        "args": [
          "and",
          [
            "$SCHUTZFRISTENDE",
            "is not",
            null
          ],
          [
            "$SCHUTZFRISTENDE",
            "<",
            "$_today"
          ]
        ],
        "target": "is_approval_required",
        "function": "condition"
      }
    ]
  }
}

# Map functions

Below, the functions usable in the Map section are listed and documented. Most functions are simply Python functions that are registered in the Map section, thus the documentation is extracted from the standard Python function and may be adapted to how the Map section functions when used.

# abs

Return the absolute value of the argument.

# all

Return True if bool(x) is True for all values x in the iterable.

If the iterable is empty, return True.

# any

Return True if bool(x) is True for any x in the iterable.

If the iterable is empty, return False.

# ascii

Return an ASCII-only representation of an object.

As repr(), return a string containing a printable representation of an object, but escape the non-ASCII characters in the string returned by repr() using \x, \u or \U escapes. This generates a string similar to that returned by repr() in Python 2.

# bool

bool(x) -> bool

Returns True when the argument x is true, False otherwise. The builtins True and False are the only two instances of the class bool. The class bool is a subclass of the class int, and cannot be subclassed.

# bytes

bytes(iterable_of_ints) -> bytes bytes(string, encoding[, errors]) -> bytes bytes(bytes_or_buffer) -> immutable copy of bytes_or_buffer bytes(int) -> bytes object of size given by the parameter initialized with null bytes bytes() -> empty bytes object

Construct an immutable array of bytes from:

  • an iterable yielding integers in range(256)
  • a text string encoded using the specified encoding
  • any object implementing the buffer API.
  • an integer

# capitalize

Return a capitalized version of the string.

More specifically, make the first character have upper case and the rest lower case.

# chr

Return a Unicode string of one character with ordinal i; 0 <= i <= 0x10ffff.

# condition

Apply a condition and store the boolean result.

The condition function accepts a condition in the same way the filter conditions can be defined in the filter section. The difference is that the result can be stored as boolean in a field when used in the Map section.

See also the separate chapter on conditions in the ETL pipeline.

Example:

{
  "section": "Map",
  "options": {
    "operations": [
      {
        "args": [
          "$ZUGAENGLICHKEIT_ID",
          "in",
          [10007, 10009]
        ],
        "target": "files_are_public",
        "function": "condition"
      }
    ]
  }
}

# count

S.count(sub[, start[, end]]) -> int

Return the number of non-overlapping occurrences of substring sub in string S[start:end]. Optional arguments start and end are interpreted as in slice notation.

# date_to_int

The function takes a date string that can be parsed into a date (using dateparse.parse_date) and converts it into an integer value. Eg 2022-06-19 will be converted to the integer 20220619.

Examples:

{
    "args": [
        "2022-06-19",
    ],
    "target": "output",
    "function": "date_to_int",
},
{
    "args": [
        "$field",
    ],
    "target": "other_output",
    "function": "date_to_int",
}

# datetime

datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]])

The year, month and day arguments are required. tzinfo may be None, or an instance of a tzinfo subclass. The remaining arguments may be ints.

# dict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

# endswith

S.endswith(suffix[, start[, end]]) -> bool

Return True if S ends with the specified suffix, False otherwise. With optional start, test S beginning at that position. With optional end, stop comparing S at that position. suffix can also be a tuple of strings to try.

# enumerate

Return an enumerate object.

iterable an object supporting iteration

The enumerate object yields pairs containing a count (from start, which defaults to zero) and a value yielded by the iterable argument.

enumerate is useful for obtaining an indexed list: (0, seq[0]), (1, seq[1]), (2, seq[2]), ...

# filter

The filter function can be used for removing items from a list.

Given there is a list of values:

{
    "input": ["Foo", "Bar", "Baz"],
}

with the filter function, we can define a condition which is applied for each list item and must evaluate to true in order for item to be kept:

{
    "function": "filter",
    "args": ["$input", ["$value", "!=", "Bar"]],
    "target": "output",
}

results in:

{
    "input": ["Foo", "Bar", "Baz"],
    "output": ["Foo", "Baz"],
}

The item in the list can be addressed with $value in the condition. See the Conditions for details on how to write conditions.

# filter_empty

The filter_empty function is used for removing empty/null values from a list.

Given there is a list of values:

{
    "input": ["Foo", null, "", 0, "Bar"],
},

with the filter_empty function is used:

"operations": [
    {
        "function": "filter_empty",
        "args": ["$input"],
        "target": "output",
    }
]

it will remove all falsy values such as null, empty string or 0:

{
    "input": ["Foo", None, "", "Bar"],
    "output": ["Foo", "Bar"],
}

# find

S.find(sub[, start[, end]]) -> int

Return the lowest index in S where substring sub is found, such that sub is contained within S[start:end]. Optional arguments start and end are interpreted as in slice notation.

Return -1 on failure.

# float

Convert a string or number to a floating point number, if possible.

# format

S.format(*args, **kwargs) -> str

Return a formatted version of S, using substitutions from args and kwargs. The substitutions are identified by braces ('{' and '}').

# format_map

S.format_map(mapping) -> str

Return a formatted version of S, using substitutions from mapping. The substitutions are identified by braces ('{' and '}').

# generate_identifier

The function takes a string and returns a hash value of the string. The hash value is truncated to 32 characters. Mainly used for generating an identifier for a string. This will always generate the same hash value for the same string.

# hash

Return the hash value for the given object.

Two objects that compare equal must also have the same hash value, but the reverse is not necessarily true.

# hex

Return the hexadecimal representation of an integer.

hex(12648430) '0xc0ffee'

# index

S.index(sub[, start[, end]]) -> int

Return the lowest index in S where substring sub is found, such that sub is contained within S[start:end]. Optional arguments start and end are interpreted as in slice notation.

Raises ValueError when the substring is not found.

# int

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

int('0b100', base=0) 4

# isalnum

Return True if the string is an alpha-numeric string, False otherwise.

A string is alpha-numeric if all characters in the string are alpha-numeric and there is at least one character in the string.

# isalpha

Return True if the string is an alphabetic string, False otherwise.

A string is alphabetic if all characters in the string are alphabetic and there is at least one character in the string.

# isascii

Return True if all characters in the string are ASCII, False otherwise.

ASCII characters have code points in the range U+0000-U+007F. Empty string is ASCII too.

# isdecimal

Return True if the string is a decimal string, False otherwise.

A string is a decimal string if all characters in the string are decimal and there is at least one character in the string.

# isdigit

Return True if the string is a digit string, False otherwise.

A string is a digit string if all characters in the string are digits and there is at least one character in the string.

# isidentifier

Return True if the string is a valid Python identifier, False otherwise.

Call keyword.iskeyword(s) to test whether string s is a reserved identifier, such as "def" or "class".

# islower

Return True if the string is a lowercase string, False otherwise.

A string is lowercase if all cased characters in the string are lowercase and there is at least one cased character in the string.

# isnumeric

Return True if the string is a numeric string, False otherwise.

A string is numeric if all characters in the string are numeric and there is at least one character in the string.

# isprintable

Return True if the string is printable, False otherwise.

A string is printable if all of its characters are considered printable in repr() or if it is empty.

# isspace

Return True if the string is a whitespace string, False otherwise.

A string is whitespace if all characters in the string are whitespace and there is at least one character in the string.

# istitle

Return True if the string is a title-cased string, False otherwise.

In a title-cased string, upper- and title-case characters may only follow uncased characters and lowercase characters only cased ones.

# isupper

Return True if the string is an uppercase string, False otherwise.

A string is uppercase if all cased characters in the string are uppercase and there is at least one cased character in the string.

# join

Join a list of items as strings into one string. The following kwargs can be used:

  • separator: The separator used to join the items. Defaults to -.
  • flatten: A boolean indicating whether the list should be flattened before joining. Defaults to false.

Example:

{
  "section": "Map",
  "options": {
    "operations": [
      {
        "function": "join",
        "args": ["$record_id", "$descriptor_id"],
        "kwargs": {"separator": "-"},
        "target": "identifier"
      }
    ]
  }
}

# len

Return the number of items in a container.

# list

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

# ljust

Return a left-justified string of length width.

Padding is done using the specified fill character (default is a space).

# lower

Return a copy of the string converted to lowercase.

# lstrip

Return a copy of the string with leading whitespace removed.

If chars is given and not None, remove characters in chars instead.

# max

max(iterable, *[, default=obj, key=func]) -> value max(arg1, arg2, *args, *[, key=func]) -> value

With a single iterable argument, return its biggest item. The default keyword-only argument specifies an object to return if the provided iterable is empty. With two or more arguments, return the largest argument.

# min

min(iterable, *[, default=obj, key=func]) -> value min(arg1, arg2, *args, *[, key=func]) -> value

With a single iterable argument, return its smallest item. The default keyword-only argument specifies an object to return if the provided iterable is empty. With two or more arguments, return the smallest argument.

# now

Returns new datetime object representing current time local to tz.

tz Timezone object.

If no tz is specified, uses local timezone.

# nullif

Returns None if the condition is true. Otherwise, the value is returned.

See also the separate chapter on conditions in the ETL pipeline.

Examples:

{
    "args": [
        "$input",
        ["$input", "==", "unknown"],
    ],
    "target": "output",
    "function": "nullif",
}
{
    "args": [
        "$input",
        ["and", ["$input", "is not", None], ["$input", "<", "0001-01-01"]],
    ],
    "target": "output",
    "function": "nullif",
}

# oct

Return the octal representation of an integer.

oct(342391) '0o1234567'

# ord

Return the Unicode code point for a one-character string.

# parse_datetime

Parse a string and return a datetime.datetime.

This function supports time zone offsets. When the input contains one,
the output uses a timezone with a fixed offset from UTC.

Raise ValueError if the input is well formatted but not a valid datetime.
Return None if the input isn't well formatted.

# partition

Partition the string into three parts using the given separator.

This will search for the separator in the string. If the separator is found, returns a 3-tuple containing the part before the separator, the separator itself, and the part after it.

If the separator is not found, returns a 3-tuple containing the original string and two empty strings.

# pick

When provided a list of keys, it picks all corresponding values in a list of dicts

Example 1:

{
    "section": "Map",
    "options": {
        "operations": [
            {
                "args": ["$people", "name"],
                "target": "list_of_names",
                "function": "pick"
            }
        ]
    }
}

item:

{
    "people": [{"name": "Hans"}, {"name": "Peter"}]
}

returns:

{
    "people": [{"name": "Hans"}, {"name": "Peter"}],
    "list_of_names": ["Hans","Peter"]
}

Example 2:

{
    "section": "Map",
    "options": {
        "operations": [
            {
                "args": ["$people", "name", "age"],
                "target": "list_of_names",
                "function": "pick"
            }
        ]
    }
}

item:

{
    "people": [{"name": "Hans", "age": 20}, {"name": "Peter", "age": 21}]
}

returns:

{
    "people": [{"name": "Hans", "age": 20}, {"name": "Peter", "age": 21}],
    "list_of_names": [("Hans", 20),("Peter", 21)]
}

# pow

Equivalent to baseexp with 2 arguments or baseexp % mod with 3 arguments

Some types, such as ints, are able to use a more efficient algorithm when invoked using the three argument form.

# range

range(stop) -> range object range(start, stop[, step]) -> range object

Return an object that produces a sequence of integers from start (inclusive) to stop (exclusive) by step. range(i, j) produces i, i+1, i+2, ..., j-1. start defaults to 0, and stop is omitted! range(4) produces 0, 1, 2, 3. These are exactly the valid indices for a list of 4 elements. When step is given, it specifies the increment (or decrement).

# removeprefix

Return a str with the given prefix string removed if present.

If the string starts with the prefix string, return string[len(prefix):]. Otherwise, return a copy of the original string.

# removesuffix

Return a str with the given suffix string removed if present.

If the string ends with the suffix string and that suffix is not empty, return string[:-len(suffix)]. Otherwise, return a copy of the original string.

# replace

Return a copy with all occurrences of substring old replaced by new.

count Maximum number of occurrences to replace. -1 (the default value) means replace all occurrences.

If the optional argument count is given, only the first count occurrences are replaced.

# reversed

Return a reverse iterator over the values of the given sequence.

# rfind

S.rfind(sub[, start[, end]]) -> int

Return the highest index in S where substring sub is found, such that sub is contained within S[start:end]. Optional arguments start and end are interpreted as in slice notation.

Return -1 on failure.

# rindex

S.rindex(sub[, start[, end]]) -> int

Return the highest index in S where substring sub is found, such that sub is contained within S[start:end]. Optional arguments start and end are interpreted as in slice notation.

Raises ValueError when the substring is not found.

# rjust

Return a right-justified string of length width.

Padding is done using the specified fill character (default is a space).

# round

Round a number to a given precision in decimal digits.

The return value is an integer if ndigits is omitted or None. Otherwise the return value has the same type as the number. ndigits may be negative.

# rpartition

Partition the string into three parts using the given separator.

This will search for the separator in the string, starting at the end. If the separator is found, returns a 3-tuple containing the part before the separator, the separator itself, and the part after it.

If the separator is not found, returns a 3-tuple containing two empty strings and the original string.

# rstrip

Return a copy of the string with trailing whitespace removed.

If chars is given and not None, remove characters in chars instead.

# set

set() -> new empty set object set(iterable) -> new set object

Build an unordered collection of unique elements.

# setif

The function sets the value in the target to a specific value when the condition is true.

The first argument is the resulting value (which may contain the value of another field when prefixed with $). The second argument is a condition. See the separate chapter on conditions in the documentation.

When the condition resolves to false, the target field is not updated. But if the field does not yet exist, it is automatically set to None in order to have a consistent set of fields over all items in the pipeline.

Examples:

{
    "args": [
        "A value",
        ["$input", "==", "A"],
    ],
    "target": "output",
    "function": "setif",
},
{
    "args": [
        "$otherField",
        ["$input", "!=" "A"],
    ],
    "target": "output",
    "function": "setif",
}

# setvalue

The function simply sets the value in the target to a specific value. The value argument is the resulting value (which may contain the value of another field when prefixed with $). Examples:

{
    "args": [
        "some value",
    ],
    "target": "some_output",
    "function": "setvalue",
},
{
    "args": [
        "$otherField",
    ],
    "target": "other_output",
    "function": "setvalue",
}

# slice

slice(stop) slice(start, stop[, step])

Create a slice object. This is used for extended slicing (e.g. a[0:10:2]).

# sorted

Return a new list containing all items from the iterable in ascending order.

A custom key function can be supplied to customize the sort order, and the reverse flag can be set to request the result in descending order.

# split

A more robust implementation of Python's str.split. If an exception is raised (e.g. when trying to split None), an empty list is returned.

# splitlines

Return a list of the lines in the string, breaking at line boundaries.

Line breaks are not included in the resulting list unless keepends is given and true.

# startswith

S.startswith(prefix[, start[, end]]) -> bool

Return True if S starts with the specified prefix, False otherwise. With optional start, test S beginning at that position. With optional end, stop comparing S at that position. prefix can also be a tuple of strings to try.

# str

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

# strftime

format -> strftime() style string.

# strip

Return a copy of the string with leading and trailing whitespace removed.

If chars is given and not None, remove characters in chars instead.

# strptime

string, format -> new datetime parsed from a string (like time.strptime()).

# sum

Return the sum of a 'start' value (default: 0) plus an iterable of numbers

When the iterable is empty, return the start value. This function is intended specifically for use with numeric values and may reject non-numeric types.

# swapcase

Convert uppercase characters to lowercase and lowercase characters to uppercase.

# timedelta

Difference between two datetime values.

timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)

All arguments are optional and default to 0. Arguments may be integers or floats, and may be positive or negative.

# title

Return a version of the string where each word is titlecased.

More specifically, words start with uppercased characters and all remaining cased characters have lower case.

# tuple

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable's items.

If the argument is a tuple, the return value is the same object.

# upper

Return a copy of the string converted to uppercase.

# zfill

Pad a numeric string with zeros on the left, to fill a field of the given width.

The string is never truncated.

# zip

The zip function is used for combining multiple lists into one list.

Given I have some lists of data in an item.

{
    "ids": [7, 5, 9],
    "names": ["foo", "bar", "baz"]
}

with the zip function I can pair them together:

"operations": [
    {
        "function": "zip",
        "args": [["$ids", "$names"]],
        "target": "result"
    }
]

and then get a combined list:

{
    "result": [[7, "foo"], [5, "bar"], [9, "baz"]],
    ...
}

and I can also do the inverse operation with the same function by reducing a list level in the input:

"operations": [
    {
        "function": "zip",
        "args": ["$result"],
        "target": "result2"
    }
]

and get:

{
    "result2": [[7, 5, 9], ["foo", "bar", "baz"]],
    ...
}

# Options

  • operations - Operationen required
    • A list of operations, each consisting of at least a function, args and a target. See section documentation for details on how to define operations.
    • Type: List

# MissingSourceItems

Takes a list of objects representing the source objects expecting a key named identifier_field for every object. Yields a list of objects which are missing in the source but present in the system by comparing identifier_field in the source with identifier_field in the system. Yields the list of missing items.

# Options

  • model - Mode name required

    • Name of the model in lowercase.
    • Type: Choice
    • Choices:
      • record: Datensatz
      • recordrelation: Verknüpfung von Verzeichnungseinheiten
      • file: Files
      • descriptor: Deskriptor
      • descriptorrelation: Verknüpfung von Deskriptoren
      • descriptorrecordrelation: Verknüpfung von Deskriptoren und Verzeichnungseinheiten
      • delivery: Ablieferung
      • deliveryrecordrelation: Verknüpfung von Ablieferungen und Verzeichnungseinheiten
      • container: Behältnis
      • containerrecordrelation: Verknüpfung von Behältnissen und Verzeichnungseinheiten
      • enumentry: Enum Wert
      • package: Package
      • location: Standort
  • fields - Fields

    • A list of field names to yield.
    • Type: List
    • Default: ['identifier']
  • filter - Filter

    • A JSON object containing the filters to apply.
    • Type: Dict
  • identifier_field - Identifier field name

    • The name of the filed on the item containing the identifier.
    • Type: Char
    • Default: 'ID_NR'
  • lookup_field - Lookup field name

    • Name of the field on the model used to look up the values from identifier_field.
    • Type: Char
    • Default: 'identifier'

# MyColexSource

This section fetches data from MyColex (opens new window).

Use endpoint to specify which data to fetch.

To authenticate against the MyColex API, an API Token needs to be provided or set as an environment variable (MYCOLEX_API_TOKEN).

Example:

{
  "section": "MyColexSource",
  "options": {
    "base_url": "https://mycolex.example.org",
    "endpoint": "inventory",
    "page_size": 100
  }
}

Example:

{
  "section": "QuerysetSource",
  "options": {
    "model": "record",
    "filter": {
      "is_public": true,
      "files_are_public": true
    }
  }
},
{
  "section": "MyColexSource",
  "options": {
    "base_url": "https://mycolex.example.org",
    "endpoint": "image",
    "page_size": 100,
    "filter": {
      "published": true
    },
    "filter_by_previous_in": {
      "previous_field": "identifier",
      "filter_field": "objects"
    }
  }
}

# Options

  • endpoint - Endpoint required

    • Which API endpoint to query data from (e.g. inventory, building, event, etc.).
    • Type: Char
  • api_token - API Token

    • The token used to authenticate in MyColex. This is usually configured as environment variable and can thus be omitted.
    • Type: Char
  • base_url - URL

    • The URL to MyColex. This is usually configured as environment variable and can thus be omitted.
    • Type: Char
  • filter - Filter

    • Filter parameters passed to the endpoint.
    • Type: JSON
  • filter_by_previous_in - Filter requested data by items in the pipeline

    • The filter_by_previous_in option allows to select rows that have a relation to items consumed by previous sections in the pipeline. When this field is configured, it must contain an object with two configurations, the filter_field name, where the relation is stored in the source system and the previous_field, which is the name of the field where the foreign identifier is stored in the consumed items. See the example for reference.
    • Type: Dict
      • filter_field - Filter field name required

        • Name of the field in MyColex.
        • Type: Char
      • previous_field - Previous field name required

        • Name of the field in the pipeline item.
        • Type: Char
  • page_size - Elements per request

    • Amount of items that are loaded in one page. Usually the default is fine.
    • Type: Integer
    • Min: 1
    • Max: 1000

# PipelineSnippet

The PipelineSnippet will substitute all sections from an existsing pipeline by passing the pipeline_id in the options. In the following example, all sections from the pipeline with id 2 will be substituted in the pipeline.

Example with input mode:

[
    { "section": "CsvFileReader" },
    {
        "section": "PipelineSnippet",
        "options": {
            "pipeline_id": 2,
        }
    },
]

# Options

  • pipeline_id - Pipeline id required
    • Type: PrimaryKeyRelated

# Publish

The publish section publishes objects of various models from the internal database to the external database.

In the internal database we have three groups of models:

  1. Models that are stored in a regular Postgres database in the internal and external system with the same model. They can be published with this Publish section.
  2. Records are stored in Postgres in internal, but are only stored in Solr in the external system. They cannot be published with the Publish section, but the Solr section which designed for this exact case.
  3. Models that are never published.

In the Publish section, we are publishing objects of models of type 1, expecting to have a 1:1 equivalent in the external.

Examples:

{
  "section": "Publish",
  "options": {
    "model": "file"
  }
}
{
  "section": "Publish",
  "options": {
    "model": "descriptorrecordrelation"
  }
}

# Options

  • model - Model name required

    • Name of the model to be published, with all lowercase characters.
    • Type: Choice
    • Choices:
      • record: Datensatz
      • recordrelation: Verknüpfung von Verzeichnungseinheiten
      • file: Files
      • descriptor: Deskriptor
      • descriptorrelation: Verknüpfung von Deskriptoren
      • descriptorrecordrelation: Verknüpfung von Deskriptoren und Verzeichnungseinheiten
      • delivery: Ablieferung
      • deliveryrecordrelation: Verknüpfung von Ablieferungen und Verzeichnungseinheiten
      • container: Behältnis
      • containerrecordrelation: Verknüpfung von Behältnissen und Verzeichnungseinheiten
      • enumentry: Enum Wert
      • package: Package
      • location: Standort
  • batch_size - Batch size

    • Amount of objects to be published in one batch.
    • Type: Integer
    • Default: 200
  • prune - Prune

    • Delete all objects of this model from the external database before starting to publish. Use with caution and only when we really have to replace all data.
    • Type: Boolean
    • Default: false

# QuerysetSource

The queryset source section loads objects from the database and yields the objects into the pipeline. It is possible to apply simple queries by using "filter" and "exclude" functions of the Django ORM.

This can be used in combination with transform sections when data must be changed within the database.

Example:

{
  "section": "QuerysetSource",
  "options": {
    "model": "record",
    "filter": {
      "is_public": true,
      "files_are_public": true
    }
  }
}

# Options

  • model - Model name required

    • Name of the model from which we want to load data.
    • Type: Choice
    • Choices:
      • record: Datensatz
      • recordrelation: Verknüpfung von Verzeichnungseinheiten
      • file: Files
      • descriptor: Deskriptor
      • descriptorrelation: Verknüpfung von Deskriptoren
      • descriptorrecordrelation: Verknüpfung von Deskriptoren und Verzeichnungseinheiten
      • delivery: Ablieferung
      • deliveryrecordrelation: Verknüpfung von Ablieferungen und Verzeichnungseinheiten
      • container: Behältnis
      • containerrecordrelation: Verknüpfung von Behältnissen und Verzeichnungseinheiten
      • enumentry: Enum Wert
      • package: Package
      • location: Standort
  • batch_size - Batch size

    • The number of objects that are loaded from the database at once.
    • Type: Integer
    • Default: 100
  • distinct - Distinct

    • If checked, only distinct objects are yielded into the pipeline.
    • Type: Boolean
  • exclude - Exclude

    • Objects passing this condition are not yielded into the pipeline. The condition is defined in a Django ORM compatible way.
    • Type: Dict
  • filter - Filter

    • Objects passing this condition are yielded into the pipeline. The condition is defined in a Django ORM compatible way.
    • Type: Dict

# RawSource

The raw source section allows to inject raw data into the pipeline. This can be helpful for testing pipelines when you want to work with a specific set of items.

Example:

{
  "section": "RawSource",
  "options": {
    "data": [
      {"key": "foo"},
      {"key": "bar"}
    ]
  }
}

# Options

  • data - Data required
    • Raw list of items to feed into the pipeline.
    • Type: JSON

# ResolveForeignKey

Resolves foreign keys for a given model and pops the result on the item.

The ResolveForeignKey section is used when having a foreign key identifier but we need the internal primary key of the object. The section looks up the object by the identifier field and replaces it with the pk so that later sections, such as the create-sections, can process the data.

When we cannot find an object with this identifier in our database, the behavior can be configured in the missing option:

  • raise (default): raise an exception and stop the pipeline execution. This can be used when we do not expect that it is missing and need to investigate such cases.
  • drop: remove the whole item from the pipeline and do not further process it. This can be used when importing n:n relations where both ends must exist.
  • nullify: set the value to null. This can be used when importing 1:1 relations which may or may not have the target imported as well.

Example:

{
  "section": "ResolveForeignKey",
  "options": {
    "model": "descriptor",
    "lookup": "identifier",
    "source": "ID_NR_1",
    "target": "from_descriptor_id",
    "missing": "drop"
  }
},
{
  "section": "ResolveForeignKey",
  "options": {
    "model": "descriptor",
    "lookup": "identifier",
    "source": "ID_NR_2",
    "target": "to_descriptor_id",
    "missing": "drop"
  }
},

# Options

  • lookup - Model field required

    • Name of the field on the model containing the identifier. Usually identifier.
    • Type: Char
  • model - Model name required

    • Name of the model where the lookup should be executed.
    • Type: Choice
    • Choices:
      • record: Datensatz
      • recordrelation: Verknüpfung von Verzeichnungseinheiten
      • file: Files
      • descriptor: Deskriptor
      • descriptorrelation: Verknüpfung von Deskriptoren
      • descriptorrecordrelation: Verknüpfung von Deskriptoren und Verzeichnungseinheiten
      • delivery: Ablieferung
      • deliveryrecordrelation: Verknüpfung von Ablieferungen und Verzeichnungseinheiten
      • container: Behältnis
      • containerrecordrelation: Verknüpfung von Behältnissen und Verzeichnungseinheiten
      • enumentry: Enum Wert
      • package: Package
      • location: Standort
  • source - Source field required

    • Name of the field in the source item to get the identifier from.
    • Type: Char
  • target - Target field required

    • Name of the field where the primary key should be stored in.
    • Type: Char
  • missing - Missing behavior

    • Behavior, when the object is not found.
    • Type: Choice
    • Default: 'raise'
    • Choices:
      • raise: Abort the pipeline run with an exception (default)
      • drop: Remove the item from the pipeline
      • nullify: Set the value to null
  • values - Field values to retrieve

    • A list of fields names for which values should be retrived. When there is only one field name, the value will be directly inserted in target. With multiple field names, a key/value object will be inserted.
    • Type: List
    • Default: ['id']

# S3Uploader

The S3Uploader section uploads single files to an S3 bucket.

The section consumes items with a path to a temporary file (for instance produced by the JSONWriter section), reads the files from the temporary file system and uploads each file to an S3 bucket.

S3 configuration:

The uploader section uses preconfigured, named S3 targets. The reason is that the credentials (secrets) cannot be configured / exposed in the pipeline because of security reasons. The available S3 targets are configured through the enviornment variable S3_CONFIGS, where each preconfigured target has a name. The section needs to know the name of the target, therefore this has to be configured with the s3_name option.

Access / ACL:

By default, the uploaded files are not publicly / anonymously accessible, because the default ACL setting is private. In order to make it available to everyone anonymously, the acl option can be set to public-read.

Pruning:

The prune option lets the section remove older files (configurable through keep_hours), so that the S3 bucket is not filled up when configuring the pipeline to run automatically / nightly. Be aware that for the prune option to work, a path needs to be configured, so that the files are uploaded to a folder instead of the bucket root. This is in order to prevent from accidentally deleting a whole bucket.

Full example:

Input:

[
  {"path": "/tmp/etl/json_writer/data.json"}
]

Section configuration:

{
  "section": "S3Uploader",
  "options": {
    "s3_name": "json-dump",
    "path": "enums",
    "delete": true,
    "acl": "private",
    "prune": true,
    "keep_hours": 120
  }
}

# Options

  • s3_name - S3 config name required

    • The name of the preconfigured S3 configuration.
    • Type: Char
  • acl - S3 ACL

    • S3 ACL setting, determining whether the file can be downloaded publicly or not.
    • Type: Choice
    • Default: 'private'
    • Choices:
      • private: private
      • public-read: public-read
  • delete - Delete

    • Delete the local file after uploading in order to reduce disk usage to a minimum.
    • Type: Boolean
    • Default: true
  • keep_hours - Keep hours

    • When pruning, keep files on the s3 that are younger than the configured amount of hours.
    • Type: Integer
    • Default: 72
  • path - Path

    • The path (directory) on the s3 within the bucket.
    • Type: Char
    • Default: null
  • prune - Prune

    • Before uploading, remove existing files from the s3. Requires a path to be set for precaution.
    • Type: Boolean
    • Default: false

# ScopeArchivSource

A section generic to fetch data from Scope Archiv using the Scope Archiv Connector.

The Scope Archiv Connector is a separate, optional service which reads preconfigured Oracle views from a Scope Archiv database and provides them as a REST API to the ETL pipeline. The ScopeArchivSource section handles the communication with the service.

Use table_name and columns to specify which data to fetch. order_columns defines the columns used to order the data, they should be unique together.

Take care when configuring order_columns: it should be one or more columns, which are indexed and are unique together. It must be indexed in order to have good enough performance on oracle and it must be unique together in order for the pagination to worker properly; otherwise data will be missing. In this context, the order is solely the order the items are loaded and processed in. Since items cannot have dependencies within the pipeline, this must be optimized for best performance.

Use a single column when querying "primary" data views, containing records which each have an own unique identifier. Use multiple columns on the other hand when reading from junction tables which usually do not have one column that is unique.

Example:

{
  "section": "ScopeArchivSource",
  "options": {
    "columns": [
      "ID_NR",
      "TITEL",
      "PARENT_ID_NR",
      "ZWEIG_POSITION",
      "SIGNATUR",
      "ANZEIGE_INTERNET"
    ],
    "page_size": 100,
    "filter_sql": "\"HIERARCHIE_PFAD\" NOT LIKE '00000796390000546102%'",
    "table_name": "public.mvk_dls_vrzng_enht_1",
    "order_columns": [
      "ID_NR"
    ]
  }
}

With the above example, the section acts as the first section in the pipeline. It can also be used for looking up specific items based on other items that are already in the pipeline, e.g. all files for specific records.

Example:

{
  "section": "QuerysetSource",
  "options": {
    "model": "record",
    "filter": {
      "is_public": true,
      "files_are_public": true
    }
  }
},
{
  "section": "ScopeArchivSource",
  "options": {
    "columns": [
      "PID",
      "ID_NR_VRZNG_ENHT"
    ],
    "page_size": 100,
    "table_name": "public.mvk_dls_ve_vrkng_fdra_1",
    "order_columns": [
      "PID",
      "ID_NR_VRZNG_ENHT"
    ],
    "filter_by_previous_in": {
      "column": "ID_NR_VRZNG_ENHT",
      "previous_field": "identifier"
    }
  }
}

# Options

  • columns - Columns required

    • A list of columns to be selected from the source.
    • Type: ListSerializer
  • order_columns - Order columns required

    • A list of one or more columns for ordering, which are indexed and unique together.
    • Type: ListSerializer
  • table_name - Table name required

    • The name of the table / oracle view to select from.
    • Type: Char
  • filter_by_previous_in - Filter requested data by items in the pipeline

    • The filter_by_previous_in option allows to select rows that have a relation to items consumed by previous sections in the pipeline. When this field is configured, it must contain an object with two configurations, the column name, where the relation is stored in the source system and the previous_field, which is the name of the field where the foreign identifier is stored in the consumed items. See the example for reference.
    • Type: Dict
      • column - Filter field name required

        • Name of the column in the source system.
        • Type: Char
      • previous_field - Previous field name required

        • Name of the field in the pipeline item.
        • Type: Char
  • filter_sql - SQL Filter Klausel

    • Additional WHERE clause in raw SQL for when we want to already filter on the source for improving performance. The filter_sql statement is joined with the generated filter_by_previous_in clause with AND.
    • Type: Char
  • page_size - Page size

    • Amount of items that are loaded in one page. Usually the default is fine.
    • Type: Integer
    • Min: 1
    • Max: 1000
  • pagination - Pagination

    • Depending on how the table looks, different pagination implementation are more efficient. Can be cursor or offset. By default, the cursor pagination is used when we sort by only one column, the offset pagination for multiple columns. Usually it is best to not configure this option.
    • Type: Choice
    • Choices:
      • cursor: Cursor-based pagination
      • offset: Pagination with offset and limit

# SkipProcessedFiles

The SkipProcessedFiles section has the job to detect as early as possible when we are processing files that we have already processed and skip those files.

Importing files is very time consuming. That's why we want to skip files when we already have processed and imported them before. This makes it possible that pipelines can be aborted and restarted when needed and they can be configured in a way that they will skip items as long as they are already processed.

In order for being able to also make updates in the configuration (e.g. tile sizes of images) and re-process the files, there is a pipeline_version string configured, which is stored on each file. If the pipeline version string from the pipeline configuration matches with the one stored in the file, the file is skipped. When the configuration is changed and the goal is to rerun everything, the pipeline version configuration in the section is increased and the pipeline rerun. With this mechanism, we can successively update the images and always have data published.

Be aware that this is optimized for non-Fedora sources by default. In the Fedora case, you must set the fedora flag to true so that only the first part of the identifier is compared, which is equal to the PID in the fedora case. This is necessary because one package (PID) can result in multiple files.

Example:

{
  "section": "SkipProcessedFiles",
  "options": {
    "identifier_field": "ID_NR_DATEI",
    "pipeline_version": "1.0.0"
  }
}

# Options

  • pipeline_version - Pipeline version required

    • Pipeline version string. The skipping is based on this string. Can be changed when the pipeline is updated.
    • Type: Char
  • batch_size - Batch size

    • Amount of items to be processed in one batch.
    • Type: Integer
    • Default: 50
  • fedora - Fedora mode

    • Set to true when the source is fedora, causing the identifiers to be compared by removing the path.
    • Type: Boolean
    • Default: false
  • identifier_field - Identifier field name

    • Name of the field containing the unique identifier of the file.
    • Type: Char
    • Default: 'ID_NR_DATEI'

# Slice

The slice section is used for only processing a certain slice of the items yielded by a previous section.

This is useful for testing pipeline configurations. It allows to limit the amount of items yielded but also to skip the first n items by setting start to n.

Example:

{
  "section": "Slice",
  "options": {
    "limit": 100,
    "start": 0
  }
}

# Options

  • limit - Limit required

    • The maximum amount of items to be yielded.
    • Type: Integer
  • start - Start

    • Skip the first n items and do not yield them.
    • Type: Integer
    • Default: 0

# Solr

The Solr section indexes records in the external, public Solr instance.

In the DLS, the Solr instance is installed in the external part and data indexed in Solr is available for the public in the search and used for displaying data in the web.

Solr is configured in a way that it only indexes records that have the is_public flag set to True.

The Solr section consumes items from the previous section, identifies them based on the identifier_field (which must match the value on the record in the identifier field) and publishes them.

It is also possible to prune the Solr, removing all data from Solr. This has the impact that the records can no longer be found in the web until indexing is fully completed.

Example:

{
  "section": "Solr",
  "options": {
    "prune": false,
    "batch_size": 500,
    "identifier_field": "identifier",
    "lookup_field": "identifier"
  }
}

# Options

  • batch_size - Batch size

    • Amount of items to be processed in one batch.
    • Type: Integer
    • Default: 200
  • identifier_field - Identifier field name

    • Name of the field in the consumed item that contains the unique identifier.
    • Type: Char
    • Default: 'identifier'
  • lookup_field - Lookup field name

    • Name of the field on the record used to look up the values in the identifier_field.
    • Type: Char
    • Default: 'identifier'
  • prune - Prune

    • Remove all data from Solr before starting to index.
    • Type: Boolean
    • Default: false

# Store

This section allows to read and write values to the store. The store is a key-value store that is persisted in the database and the values can be shared between all ETL pipelines over multiple pipeline runs. The store is organized in contexts. Each context has its own set of keys. The default context is default.

# Write operation

{
    "section": "Store",
    "options": {
        "write": {
            "await_success": true,
            "context": "default",
            "key": "my_key",
            "value": "my_value"
        }
    }
}

This will write the value my_value to the store with the key my_key in the default context. The context and key pair must be unique in the store. The await_success option can be used to wait for the whole pipeline to finish successfully before writing the value to the store. When this option is set to true (default), the value will be written to the store after the pipeline has finished successfully. In case of an error in any section of the pipeline, the value will not be written to the store.

When using the await_success option, the key-value pair will not be available in the pipeline for the next store sections. When using the await_success option, the key-value pair will not be available in the pipeline. Setting the await_success option to false will write the value to the store immediately.

The write options needs to have a previous section in the pipeline. So it can not be the first section.

# Read operation

The read option can be used to read a value from the store and set it to a key in the item.


{
    "section": "Store",
    "options": {
        "read": {
            "context": "default",
            "key": "my_key",
            "foreach": {
                "target_path": "/my_target_path"
            }
        }
    }
}

When the foreach option is used, the section will be executed for each item in the pipeline. The value from the store will be inserted into the item with the target_path key. The foreach option is optional and must be a valid JSON Pointer. When no foreach is specified, the item from the previous section will be replaced by a new item containing the store value.

# Delete operation

The delete option can be used to delete a value from the store.

{
    "section": "Store",
    "options": {
        "delete": {
            "context": "default",
            "key": "my_key"
        }
    }
}

# List operation

The list option can be used to read all values of a contet from the store as a new item in the pipeline.

{
    "section": "Store",
    "options": {
        "list": {
            "key": "my_key",
            "context": "default"
        }
    }
}

This will insert a new item in the pipeline which has a key value containing a mapping of the existing key/value pairs within the store context.

# Set context operation

This operation sets the store context for all future section. This impacts later Store sections but also sets the store context for expression fields in other sections. The default context is default.

To set the context to my_context:

{
    "section": "Store",
    "options": {
        "set_context": {
            "context": "my_context"
        }
    }
}

# Options

  • write - Write operation

    • Writes a key/value pair into the store.
    • Type: Dict
    • Default: null
      • key - Store key required

        • The key for writing the value. It must contain only alphanumeric characters and underscores.
        • Type: Expression
      • value - Store value required

      • await_success - Await success

        • When set to true, the value is not written until the pipeline terminates without errors.
        • Type: Boolean
        • Default: false
      • context - Store context

        • The store context in which to operate. When omitted, the default store context of a previous Store.set_context operation is used or the context default. It must contain only alphanumeric characters and underscores.
        • Type: Expression
      • remove_item - Remove pipeline item

        • When set to True, the pipeline item is removed from the pipeline after writing to the store.
        • Type: Boolean
        • Default: false
  • read - List operation

    • Reads all key/value pairs within a context from the store.
    • Type: Dict
    • Default: null
      • key - Store key required

        • The key to read the value from. It must contain only alphanumeric characters and underscores.
        • Type: Expression
      • context - Store context

        • The store context in which to operate. When omitted, the default store context of a previous Store.set_context operation is used or the context default. It must contain only alphanumeric characters and underscores.
        • Type: Expression
      • foreach - Foreach

        • When used, the value is read and updated into each processed item in the pipeline. When omitted, a new item is inserted into the pipeline containing the value.
        • Type: Dict
          • target_path - Path to store the result required
            • Path to the key on the item in the pipeline where the value will be stored. "" can be used to replace the entire item.
            • Type: Char
  • delete - Delete operation

    • Removes key/value pairs from the store.
    • Type: Dict
    • Default: null
      • context - Store context

        • The store context in which to operate. When omitted, the default store context of a previous Store.set_context operation is used or the context default. It must contain only alphanumeric characters and underscores.
        • Type: Expression
      • key - Store key

        • The key for the identifying the key/value pair to delete. When omitted, all key/value pairs in the context are deleted. It must contain only alphanumeric characters and underscores.
        • Type: Expression
        • Default: null
      • remove_item - Remove pipeline item

        • When set to True, the pipeline item is removed from the pipeline after deleting from the store.
        • Type: Boolean
        • Default: false
  • list - List operation

    • Reads all key/value pairs within a context from the store.
    • Type: Dict
    • Default: null
      • context - Store context
        • The store context in which to operate. When omitted, the default store context of a previous Store.set_context operation is used or the context default. It must contain only alphanumeric characters and underscores.
        • Type: Expression
  • set_context - Set context operation

    • Sets the default store context for later sections in the pipeline.
    • Type: Dict
    • Default: null
      • context - Store context required
        • The default store context to set for later sections.
        • Type: Char

# TemplateWriter

The TemplateWriter section consumes items and writes them to the output_filename using the template configured through the template_filename. The template can have any content along jinja2 expressions to render the items. The items' context is called items. The path to the output file is yielded to the next section which may process it further.

In detail, the section writes all items consumed from the prior section into files on the temp file system of the ETL worker. It does not yield the consumed items to the next section but it yields only one item per written file, which contains the path to the file on the disk.

The written files are automatically removed after they are processed by the next section in order to keep disk usage low.

The items are written in batches (configurable through batch_size). The batch_size is also accessible for the output_filename as a context variable.

  • batch - the batch number, starting with 1.
  • date - the date when the pipeline was executed in the form YYYMMMDD.
  • time - the date and time in form YYYYMMDD-hhmmss; the date and time is the same for all written batch files for convenience, thus it does not resemble the time the file was written but the time the pipeline run was started.
  • items - The items accessible in the template for the current batch.

Example configuration:

{
  "section": "TemplateWriter",
  "options": {
    "batch_size": 200,
    "output_filename": "data-{{ date }}-{{ batch }}.xml",
    "template_filename": "template.xml"
  }
},

Example output with two batches:

[
  {"path": "/tmp/etl/template_writer/data-20221229-1.xml", "size": 200},
  {"path": "/tmp/etl/template_writer/data-20221229-2.xml", "size": 173}
]

WARNING

Be aware that when batch_size is set to a non-zero value and batch is missing in the output_filename template, the section will yield the same file path / filename multiple times with multiple batches of items. Depending on what is done with the files, the file may be overwritten on the target system and may result in missing data.

# Options

  • template_filename - Template filename required

  • batch_size - Batch size

    • Amount of objects to be written in one file. Set it to 0 in order to write all items into one file.
    • Type: Integer
    • Default: 1000
  • output_filename - Output filename

    • Filename template of the output file
    • Type: Expression
    • Default: data-.xml

# Terminate

When the Terminate section's condition is truthy for the first item, the pipeline is terminated, so that no further items are processed by previous sections.

The Terminate section is similar to the Filter section, but the Filter section keeps processing items individually and may remove them from the pipeline. The Terminate section is a bit different: it can be used for performance optimization when the condition indicates that no later item must be processed and therefore it does not make sense to keep consuming data from the data source.

The condition field of the Terminate section uses the newer expression field syntax for defining conditions.

Example:

{
    "section": "Terminate",
    "options": {
        "condition": "{{ item.value > 2 }}"
    }
}

# Options

  • condition - Terminate condition required
    • When the first processed item has a thruthy condition, the pipeline is terminated.
    • Type: Expression

# Tree

The Tree section builds a tree based on the record, so that the records can be displayed as tree in the web.

Create the tree (Node instances) based on records in the database. Starting with the root node, all immediate children nodes of this record are created. If the children have further children, they are recursively created as well.

Yield the data used to create the tree.

Example:

{
  "section": "Tree",
  "options": {
    "root_value": "-1"
  }
}

# Options

  • root_value - Root value
    • Identifier of the root record. There must be exactly one root record.
    • Type: Char
    • Default: '-1'

# TreeSorter

Sort items for building a tree.

The sorter makes sure that parents always are before their children in the pipeline.

The sorter also handles orphan nodes according to the configuration.

Options:

  • id_field: name of the ID field of a node.
  • parent_id_field: name of the parent ID field of a node.
  • root_value: parent_id_field value of the root node.
  • orphans: Handling of orphan nodes.

Orphan node options:

  • "drop": drop the records (default)
  • "raise": abort the pipeline run with an exception

# Options

  • id_field - ID field name

    • Type: Char
    • Default: 'ID_NR'
  • orphans - Orphan strategy

    • Type: Choice
    • Default: 'raise'
    • Choices:
      • drop: Drop the records (default)
      • raise: Abort the pipeline run with an exception
  • parent_id_field - Parent-ID field name

    • Type: Char
    • Default: 'PARENT_ID_NR'
  • root_value - Root value id

    • Type: Char
    • Default: '-1'

# Variables

Variables can be set during the execution time, so that they can be used in expression fields of other sections. Variables can either be set raw (using the variables option) or by rendering the values as expressions.

Simple example:

[
    {
        "section": "Variables",
        "options": {
            "variables": {
                "mapping": {
                    "key1": "Label 1",
                    "key2": "Label 2"
                }
            }
        }
    },
    {
        "section": "AttributeMapper",
        "options": {
            "mapping": [
                {
                    "value": "{{ var.mapping.get(item.key, item.key) }}",
                    "target": "label"
                }
            ]
        }
    }
]

# Options

  • expressions - Expression variables

    • The value of these variables are evaluated as expression before setting. item is only avaiable when mode is set to each.
    • Type: Dict
    • Default: {}
  • mode - Mode

    • The mode defines, when and how often the variables are set.
    • Type: Choice
    • Default: 'once'
    • Choices:
      • once: Variables are set only once when starting the pipeline.
      • each: The variables are set per each item.
  • variables - Raw variables

    • Dictionary of variables, where the value is set without processing (raw).
    • Type: Dict
    • Default: {}

# XMLReader

The XMLReader section allows to read XML files and load containing items into the pipeline.

This section expects the previous section to yield a dictionary with a path key, such as it is done by the HTTP section. The path key should point to a XML file. An example of a valid item is:

[
    {
        "path": "/path/to/file.xml"
    }
]

Lets say we have an XML file with the following structure:

<SearchDetailResponse xmlns="http://www.cmiag.ch/cdws/searchDetailResponse">
    <Verzeichnungseinheit>
        <ID>164494</ID>
        <Titel>Abteilung für die XYZ</Titel>
        <Verzeichnungsstufe>Serie</Verzeichnungsstufe>
        <Findmittel/>
    </Verzeichnungseinheit>
</SearchDetailResponse>

We can use the items option to specify the path to the items in the XML file. The path should be a valid JSON pointer. For getting more information about the JSON Pointer syntax, please refer to the JSON Pointer documentation (opens new window).

For example, if we want to yield the Verzeichnungseinheit items, we can use the following configuration:

{
    "section": "XMLReader",
    "options": {
        "items": "/SearchDetailResponse/Verzeichnungseinheit"
    }
}

This will yield the following items:

[
    {
        "ID": "164494",
        "Titel": "Abteilung für die XYZ",
        "Verzeichnungsstufe": "Serie",
        "Findmittel": null,
    }
]

# Making the child items a list

Lets say we have an XML file with the following structure:

<SearchDetailResponse xmlns="http://www.cmiag.ch/cdws/searchDetailResponse">
    <Verzeichnungseinheit>
        <ID>164494</ID>
        <Titel>Abteilung für die XYZ</Titel>
        <Verzeichnungsstufe>Serie</Verzeichnungsstufe>
        <Bilder>
            <Bild src="imageserver.ch/image123456.jpg">Bild 1</Bild>
        </Bilder>
        <Findmittel/>
    </Verzeichnungseinheit>
</SearchDetailResponse>

By default, the Bilder item will be a dictionary because there is only one Bilder item. Using with the following default configuration:

{
    "section": "XMLReader",
}

This will yield the following item:

[
    {
        "SearchDetailResponse": {
            "@xmlns": "http://www.cmiag.ch/cdws/Verzeichnungseinheit",
            "Verzeichnungseinheit":  {
                    "ID": "164494",
                    "Titel": "Abteilung für die XYZ",
                    "Verzeichnungsstufe": "Serie",
                    "Bilder": {
                        "Bild": {
                            "@src": "imageserver.ch/image123456.jpg",
                            "#text": "Bild 1"
                        }
                    },
                    "Findmittel": null,
                },
        }
    }
]

If we want to make the Bilder items as a list, we can use the ensure_list option. option:

{
    "section": "XMLReader",
    "options": {
        "ensure_list": ["Bilder"]
    }
}

This will yield the following item:

[
    {
        "SearchDetailResponse": {
            "@xmlns": "http://www.cmiag.ch/cdws/Verzeichnungseinheit",
            "Verzeichnungseinheit":  {
                    "ID": "164494",
                    "Titel": "Abteilung für die XYZ",
                    "Verzeichnungsstufe": "Serie",
                    "Bilder": [
                        {
                            "Bild": {
                                "@src": "imageserver.ch/image123456.jpg",
                                "#text": "Bild 1"
                            }
                        }
                    ],
                    "Findmittel": null,
                },
        }
    }
]

The ensure_list option can be used to ensure that the items are a list. You can only specify the name of the tag that should be a list. All the tags with the same name will be converted to a list.

# Using the path option

The path option can be used to specify the path to the XML file. The path determined by looking up "$" prefixed names in previous section. For example, if the previous section yields the following item:

[
    {
        "different_path_key": "/path/to/file.xml"
    }
]

We can use the following configuration:

{
    "section": "XMLReader",
    "options": {
        "path": "/different_path_key"
    }
}

# Keep the previous item data

By default, the previous item data is discarded and the item is replaced with the parsed result. If you want to keep the previous item data, you can use the target option. For example, we can use the following configuration:

{
    "section": "XMLReader",
    "options": {
        "target": "/xml_data"
    }
}

This will yield the following item:

[
    {
        "path": "/path/to/file.xml",
        "xml_data": [
            {
                "ID": "164494",
                "Titel": "Abteilung für die XYZ",
                "Verzeichnungsstufe": "Serie",
                "Findmittel": null,
            }
        ]
    }
]

# Set statistics total

The XMLReader section is often used in combination with a HTTP section, which supports pagination. In this case, the pipeline does not know from the beginning how many items it is about to process, causing the progressbar and section statistics to be updated for each page.

This can be prevented when the queried API responds the total amount of items at the first page. The total option can be used in the XMLReader section to tell the pipeline the overall total amount of items to be processed. The information can be gathered from the first response body or the response headers.

Examples:

Given we are processing a response body such as:

<Response total="123" pagesize="15">
    <items>...</items>
</Response>

We can extract the total with this configuration:

{
    "section": "XMLReader",
    "options": {
        "total": "{{ document.Response.total }}",
        "items": "/Response/items"
    }
}

When the response has a response header such as x-total-results, this configuration sets total:

{
    "section": "XMLReader",
    "options": {
        "total": "{{ item.headers['x-total-results'] }}"
    }
}

# Options

  • ensure_list - Ensure list

    • The tag names, that should contain lists.
    • Type: List
    • Default: []
  • items - Items

    • JSON Pointer to the list of items that will be passed on to the next section individually. When not defined, the whole document is passed on as one item.
    • Type: JSONPointer
    • Default: ''
  • path - Path

    • JSON Pointer adressing the path to the file to be loaded.
    • Type: JSONPointer
    • Default: JsonPointer('/path')
  • target - Target

    • JSON Pointer for the place where the result is inserted in the processed item. When not set, the processed item is replaced by the resulting item.
    • Type: JSONPointer
    • Default: null
  • total - Total count

    • When processing multiple items with data, such as HTTP responses, the pipeline statistics and progressbar keep being updated. When the first item/HTTP response contains information about the total amount of items to be processed, this information can be set using the total option. The total option accepts an expression, where the previous item / HTTP response can be accessed with item and the parsed content body can be accessed with document.
    • Type: Expression
    • Default: null