# Sections
Sprache
Teile dieser Dokumentation werden automatisch aus dem Sourcecode generiert und sind daher nur in englischer Sprache verfügbar.
Eine ETL Pipeline ist eine Verkettung mehrerer sog. "Sections". Dabei werden die einzelnen verarbeiteten Items von einer Section zur nächsten Section weitergegeben. Jede Section hat dabei eine Aufgabe und verarbeitet und verändert einzelne Items, bevor sie an die nächste Section weitergegeben werden. Dabei kann eine Section aus einem einzelnen Items mehrere machen aber auch Items ganz weglassen.
Die ETL Pipeline besteht aus drei Arten von Arbeiten:
- "E" für "Extract": Daten werden von einer Quelle extrahiert und in die Pipeline gefüttert
- "T" für "Transform": Daten werden nach gewissen Regeln vollautomatisch transformiert
- "L" für "Load": Daten werden in den digitalen Lesesaal geladen / gespeichert resp. publiziert.
Um in einer Pipeline eine Section einzusetzen, wird ein JSON-Objekt definiert, das mindestens
ein Key section
hat, sowie weitere optionale Angaben.
section
: Technischer Name der Section (wie z.B.AttributeMapper
)title
(optional): Titel, der anstelle dersection
angezeigt wird und besser erklärt, was in diesem Schritt gemacht wird.comment
(optional): Beschreibung fürs Verständnis und die Dokumentation der Pipeline.options
(optional): Konfigurationsoptionen für die Section.
Beispiel:
{
"section": "Map",
"title": "Entstehungszeitraum",
"comment": "Der Entstehungszeitraum wird anhand des Feldes 'Datum' generiert...",
"options": []
}
# AttributeMapper
Expects an option "mapping" as a list of dicts. Each dict represents an attribute mapping and has to contain the keys "source" (the key of the source attribute) and "target" (the new key of the attribute).
Only attributes listed in the "mapping" option are kept, all others are discarded.
Example:
{
"section": "AttributeMapper",
"options": {
"mapping": [
{
"source": "$TITLE",
"target": "title"
},
{
"source": "$ID",
"target": "legacy_id"
},
{
"source": "$data.signature",
"target": "signature"
},
{
"source": "$data.history",
"target": "history",
"default": ""
},
{
"source": "$data.format",
"target": "format",
"omit_empty": true
}
]
}
}
When source
does not exist on an item and no default
is defined, the pipeline is aborted with an error.
If a default
value is defined, the target will be set to the default
value in case the source
does
not exist on the item.
When omit_empty
is set to true
, the target key is removed from the item when the source key does not
exist or the value is an empty string (""
) or null
.
In order to create custom fields on the fly, use the custom
flag in the
mapping item. It's best to also define labels in the required languages, so that
the item is properly displayed later on the website.
When defining custom fields, it is possible to set a protected
flag (see the example below for reference).
The protected flag has the effect that this particular custom field is not published to the website (Solr) and
is only stored in the internal systems.
A use case for this feature is to be able to display protected data, such as person identifiy information (PII)
to authorized personell in the orders management interface whilst publishing the record as a whole to
the public without this sepcific metadata field and its contents which may deserve protection.
Custom fields are not protected by default.
Example:
{
"section": "AttributeMapper",
"options": {
"mapping": [
{
"custom": true,
"source": "$GESCHICHTE",
"label_de": "Geschichte",
"label_fr": "Histoire",
"label_en": "History",
"display_group": 2,
"slug": "history"
},
{
"custom": true,
"protected": true,
"source": "$SCHUTZFRISTNOTIZ",
"slug": "schutzfristnotiz",
"label_de": "Schutzfristnotiz"
}
]
}
}
# Options
mapping
- Field mapping- The field mapping is a list of dicts, each mapping one field value.
- Type:
List (AttributeMapping)
source
- Source fieldname or expression required- The source of the value in the item can be configured with a simple field name (e.g.
title
) or with an expression (e.g.$title
or$metadata.title
). - Type:
Char
- The source of the value in the item can be configured with a simple field name (e.g.
custom
- Custom field- When true, the value is prepared to be imported as custom field. When used,
target
must not be set butslug
instead. - Type:
Boolean
- Default:
false
- When true, the value is prepared to be imported as custom field. When used,
default
- Default value- Default value which is set when
source
is not set in the item. - Type:
JSON
- Default value which is set when
display_group
- ISAD(G) group number- Can be used in combination with
custom=true
in order to define in which ISAD(G) display group the custom value will be displayed in the default metadata view. - Type:
Choice
- Choices:
1
: Identity Statement ("Identifikation")2
: Context ("Kontext")3
: Content and Structure ("Inhalt und innere Ordnung")4
: Conditions of Access and Use ("Zugangs- und Benutzungsbestimmungen")5
: Allied Materials ("Sachverwandte Unterlagen")
- Can be used in combination with
label_de
- German label- Type:
Char
- Type:
label_en
- English label- Type:
Char
- Type:
label_fr
- French label- Type:
Char
- Type:
omit_empty
- Omit empty- When the value is missing in item and
omit=true
, the value is removed completely from the result. - Type:
Boolean
- Default:
false
- When the value is missing in item and
protected
- Protected custom value- In combination with
custom=true
, settingprotected=true
makes custom field values to be excluded in the data sent to the website / external systems. - Type:
Boolean
- Default:
false
- In combination with
slug
- Customfield Slug- The slug is the identifier of the custom field. It can be used optionally in combination with
custom=true
in order to make the field identifiable in other configs such as for the Web. - Type:
Char
- The slug is the identifier of the custom field. It can be used optionally in combination with
target
- Target fieldname- The target must be configured when mapping to a product field or when the attribute mapper is used for renaming / filtering data. When the target is used, it simply sets the value with this fieldname.
- Type:
Char
# Compress
The compress section allows for compressing a single file with gzip
.
In order for this section to work, a previous section (such as JSONWriter
)
must have written a file to the temp directory of the ETL worker
and yield the path as an item in the pipeline.
The Compress
section then compresses the file on the disk (adding the suffix .gz
) and replacing
the path in the item which is yielded further. The original file is not kept.
Example:
{
"section": "Compress",
"options": {
"path_field": "path",
"level": 7
}
},
Input:
{"path": "/tmp/etl/json_writer/data.json"}
Output:
{"path": "/tmp/etl/json_writer/data.json.gz"}
The compressed file is automatically removed by the section after it is processed by the later sections.
# Options
level
- Compression level- The compression level for the
gzip
tool as a number from1
(fast) to9
(best). - Type:
Integer
- Default:
null
- The compression level for the
path_field
- Path field- Name of the field in the consumed item containing the path to the file on the disk.
- Type:
Char
- Default:
'path'
result_path_field
- Resulting path field- Name of the field where the resulting path of the compressed file is written to. By default (
null
) this matches thepath_field
so that the path is overwritten. - Type:
Char
- Default:
null
- Name of the field where the resulting path of the compressed file is written to. By default (
# CreateContainerRecordRelations
Creates or updates (based on their "identifier") container record relations for items on the pipeline.
Example:
{"section": "CreateContainerRecordRelations"}
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
# CreateContainers
Creates or updates (based on their "identifier") containers for items on the pipeline.
Example:
{"section": "CreateContainers"}
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
# CreateDeliveries
Creates or updates (based on their "identifier") deliveries for items on the pipeline.
{"section": "CreateDeliveries"}
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
# CreateDeliveryRecordRelations
Creates or updates (based on their "identifier") delivery record relations for items on the pipeline.
{"section": "CreateDeliveryRecordRelations"}
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
# CreateDescriptorRecordRelations
Creates or updates (based on their "identifier") descriptor record relations for items on the pipeline.
{"section": "CreateDescriptorRecordRelations"}
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
# CreateDescriptorRelations
Creates or updates (based on their "identifier") descriptor relations for items on the pipeline.
{"section": "CreateDescriptorRelations"}
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
# CreateDescriptors
Creates or updates (based on their "identifier") descriptors for items on the pipeline.
{"section": "CreateDescriptors"}
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
# CreateEnumEntries
Creates or updates enum entry objects. For updating, the unique combination of "enum" and "identifier" is used.
Configuration example:
{"section": "CreateEnumEntries"}
Accepts items in this form:
{
"enum": "archival_type",
"identifier": "pic",
"order": 1,
"label_de": "Bild",
"label_en": "Picture",
"label_fr": "Image",
"label_it": null,
"data": {"important": true},
},
Keys:
enum
(required) contains the identifier of the enum, which usually matches the field name.identifier
(required) contains the identifier of the entry in the enum, which must be unique perenum
.order
(optional) integer value for ordering.label_*
(optional) labels translated in each supported and enabled language, such asde
,en
,fr
,it
,rm
.data
: (optional) JSON field for storing additional structured data in this record.
prune_enum Option:
The prune_enum
option makes it possible to delete and replace a whole set of
enum entries with the same enum
value without changing other existing enums.
This is done by removing all existing entries for an enum
on the first appearance
of an item in the pipeline with this enum
name.
Enum entries which have an enum
name that does not appear in the pipeline are
kept.
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
prune_enum
- Prune enum- Delete all enum entries of newly imported enums.
- Type:
Boolean
- Default:
false
# CreateIIIFSources
Creates or updates (based on their "id") file objects for items on the pipeline. Prunes all existing relations before re-creating them.
ℹ️ The name CreateIIIFSources
has historic reasons.
Nowadays, the section is used for all kinds of files.
{"section": "CreateIIIFSources"}
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
# CreateLocations
Creates or updates (based on their "identifier") locations for items on the pipeline.
Example:
{"section": "CreateLocations"}
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
# CreatePackages
Creates or updates (based on their "identifier") packages for items on the pipeline.
Example:
{"section": "CreatePackages"}
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
# CreateRecordRelations
Creates or updates (based on their "identifier") record relations for items on the pipeline.
Example:
{"section": "CreateRecordRelations"}
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
# CreateRecords
Creates or updates (based on their "identifier") records for items on the pipeline.
{"section": "CreateRecords"}
# Options
batch_size
- Batch size- Amount of objects to be saved to the database in one batch.
- Type:
Integer
- Default:
100
prune
- Prune objects- Delete all objects before creating new ones.
- Type:
Boolean
- Default:
false
# CsvFileReader
Accepts a list of strings denoting the absolute paths on the file system. Reads each CSV file in that list and returns a dict for each row.
Example:
{"section": "CsvFileReader",
"options": {"columns": ["ID_NR"]}}
# Options
columns
- Columns- A list of column names to add to the pipeline. If empty, all columns are added.
- Type:
List
- Default:
[]
# Deduplicate
Deduplicate any items by the value of a specific field.
In some cases, we need to deduplicate items when they appear twice, identified by a specific fieldname. This section does exactly that.
Example:
{
"section": "Deduplicate",
"options": {
"field": "identifier"
}
}
# Options
field
- Field name required- Name of the field of the unique value which is used for deduplicating items that have the very same value in this field.
- Type:
Char
# DeleteItems
Takes a list of objects expecting a key named identifier_field
.
All entries from the model model
are deleted matching the list of
identifiers. Yields the initial list again.
# Options
model
- Model name required- Name of the model to delete from, all in lowercase characters.
- Type:
Choice
- Choices:
record
: Datensatzrecordrelation
: Verknüpfung von Verzeichnungseinheitenfile
: Filesdescriptor
: Deskriptordescriptorrelation
: Verknüpfung von Deskriptorendescriptorrecordrelation
: Verknüpfung von Deskriptoren und Verzeichnungseinheitendelivery
: Ablieferungdeliveryrecordrelation
: Verknüpfung von Ablieferungen und Verzeichnungseinheitencontainer
: Behältniscontainerrecordrelation
: Verknüpfung von Behältnissen und Verzeichnungseinheitenenumentry
: Enum Wertpackage
: Package
identifier_field
- Identifier Feldname- Name of the field on the item in the pipeline, containing the value to look for in
lookup_field
. - Type:
Char
- Default:
'identifier'
- Name of the field on the item in the pipeline, containing the value to look for in
lookup_field
- Model field name- Name of the field on the
model
used to look up the values fromidentifier_field
. - Type:
Char
- Default:
'identifier'
- Name of the field on the
# ExcelReader
Accepts a list of strings denoting the absolute paths on the file system. Reads each Excel file in that list and returns a dict for each row.
Example:
{"section": "ExcelReader",
"options": {"columns": ["ID_NR"]}}
Renaming columns:
In the ETL pipeline, the keys should be technical. Characters such as .
or ,
can break
certain section in the pipeline. In such cases, the fields can be simply renamed.
{
"section": "ExcelReader",
"options": {
"columns": ["Inventarnr. verwandte Objekte"],
"rename_columns": {"Inventarnr. verwandte Objekte": "inventarnr_verwandte_objekte"}
}
}
# Options
columns
- Column names- A list of column names to preserve. If empty, all columns are preserved.
- Type:
List
- Default:
[]
rename_columns
- Rename columns- Mapping of columns to rename, where the key is the string name of the excel column and the value is the new name it should have.
- Type:
Dict
- Default:
{}
sheet
- Sheet name- The name of the excel sheet to read from. When not set, the first sheet is read.
- Type:
Char
- Default:
null
# Expand
The Expand
section is used for repeating an item.
With this section, in item can be repeated for each value of a list in a property of the items.
Example Pipeline definition:
{
"section": "Expand",
"options": {
"fields": ["relation_target"]
}
}
Input:
[
{"relation_source": 1, "relation_target": [2, 3, 4]}
]
Output:
[
{"relation_source": 1, "relation_target": 2},
{"relation_source": 1, "relation_target": 3},
{"relation_source": 1, "relation_target": 4}
]
When one of the fields is missing or the value is empty or falsy, the item will not be passed.
# Options
field
- Field name- Name of the field that contains a list by which the items should be expanded.
field
andfields
are mutual exclusive. - Type:
Char
- Default:
null
- Name of the field that contains a list by which the items should be expanded.
fields
- Field names- List of fieldnames to expand at the same time. Each field must have the same amount of values.
field
andfields
are mutual exclusive. - Type:
List
- Default:
null
- List of fieldnames to expand at the same time. Each field must have the same amount of values.
split
- Split by character- Split the string value of the fields by this character.
- Type:
Char
- Default:
null
strip
- Strip characters- Strips the characters defined in this field in all the values of the expanded lists.
- Type:
Char
- Default:
null
target
- Target field name- Name of the field that the expanded field value should be stored in. When not defined the field is replaced inplace and the original value is dropped.
target
can not be combined withfields
. - Type:
Char
- Default:
null
- Name of the field that the expanded field value should be stored in. When not defined the field is replaced inplace and the original value is dropped.
# ExtractZip
The ExtractZip
section allows to extract Zip files that were placed
in the pipeline by a previous section such as FileSource
.
It expects an item with a path to a ZIP file and yields an item with a path for each file in the ZIP after extraction.
Example item:
{
"path": "/tmp/etl/zip/foo.txt"
}
### Options
## FileDeduplicator
The file deduplicator section has the job to reduce the amount
of processed files by skipping duplicated images.
The use case is that when one specific record has multiple files,
containing the original (which we assume to be a TIF) and lower
quality preview copies (which we assume to be JPG), we only want
to keep the TIF.
**Example:**
```json
{
"section": "FileDeduplicator",
"options": {
"filename_field": "DATEINAME",
"record_identifier_field": "ID_NR_VRZNG_ENHT",
}
}
# Options
filename_field
- Filename field name- The name of the field containing filenames or paths. The value of the field is used for identifying duplicates based on the filename stem and for identifying the extension for selecting the best version of the image.
- Type:
Char
- Default:
'DATEINAME'
record_identifier_field
- Record identifier field name- The name of the field containing the identifier of the record.
- Type:
Char
- Default:
'ID_NR_VRZNG_ENHT'
# FileDownloader
The FileDownloader
section has the purpose to download files from a source
system and store them on a temporary volume for further processing by other sections
and services.
The section works in combination with a file-downloader
service, which runs in
a separate. The source system must be configured properly in the file downloader
section.
Source types:
- HTTP / HTTPS (
http
): The file downloader service can download files from HTTP server. - S3 (
s3
): Download the files from an S3 service, supporting S3 authentication configured in the service. - Fedora Repository (
fedora
): Downloads an AIP package (jar-file) from a Fedora Repository service, supporting authentication configured in the service. The AIP archive is then ZIP-extracted and the files are processed later by the publisher section
Path/URL replacing:
In order to create an absolute path which works on the source system based on an URL or filesystem path, we often have to replace parts of the string. The section has a built-in functionality for doing that based on regular expressions.
The example below shows a configuration for this conversion:
- input:
file:///Q:/2018-00_P-S&N/STA_H_43_54/STA_H_43_54.xml
- output:
/2018-00_P-S&N/STA_H_43_54/STA_H_43_54.xml
Example:
{
"section": "FileDownloader",
"options": {
"identifier_field": "ID_NR_DATEI",
"source_path_field": "URL",
"source_path_regexp_replace": [
"file:///[A-Z]:(.*)",
"\1"
]
}
}
This section's output is meant to be consumed by a FilePublisher
section later.
The section especially yields the attributes public_path
and temp_files
.
Giving URLs as input:
The section can also be used to download files from a HTTP server. In this case,
the source_type
option must be set to http
. The source_path_field
have to
point to a field containing the URL in the previous section's output.
The config_name
can be omitted in this case.
The file might not have an identifier since it is not coming from database.
To generate a unique identifier use Map
section and generate_identifier
method.
Example:
{
"section": "FileDownloader",
"options": {
"source_type": "http",
"identifier_field": "identifier",
"source_path_field": "URL"
}
}
# Options
source_path_field
- Source path field name required- Name of the field containing a Path or URL with the path and name of the file on the source system. The path extracted from here is expected to be usable as path in the URL on the source system.
- Type:
Char
config_name
- File downloader source config name- Name of the preconfigured file downloader source config which was preconfigured when installing the system. The available config names are different from installation to installation.
- Type:
Char
identifier_field
- Identifier field name- Name of the field containing the unique identifier of the file on the source system.
- Type:
Char
- Default:
'ID_NR_DATEI'
source_path_regexp_replace
- Source path regexp replace list- A search/replace regexp pattern, represented as a list of two strings, the first one containing a search pattern, the second a replace pattern.
- Type:
List
source_type
- Source type- The type of the source system.
- Type:
Choice
- Choices:
s3
: S3 (Default)http
: HTTP / HTTPSfedora
: Fedora Repository
# FilePublisher
The file publisher section has the goal to publish files to the public S3 bucket, so that it can be displayed on the web.
The pipeline must be configured in a way, so that only files of records are fed to
this section, when the records are public (is_public
) and their files can also
be public (files_are_public
).
In order for the file publisher section to work properly, a set of publisher services in separate containers are required. Depending on the file type, these sections have ressource intensive tasks to do, such as preparing the images for a IIIF viewer.
This section is expected to be used in combination with the FileDownloader
section,
which adds items to the pipeline containing paths to the files on the shared temporary
volume. It especially requires the attributes public_path
and temp_files
on the
items.
Pipeline version:
File publishing is in general a slower task compared to only processing metadata. Therefore, it is important to have a mechanism for identifying situations where the work is already done and does not need to be redone.
This is implemented with a "pipeline version" feature. As a pipeline version value we can define an arbitrary version number of the current configuration of the pipeline and the worker services. When we rerun the same pipeline, files will be skipped that were already processed with the currently configured pipeline version. In this case, the section aborts the file processing and removes the file from the pipeline, since it is already processed.
Whitelist and Blacklist:
In this section, you can filter files using a whitelist and blacklist configuration which are lists of file extensions. If the whitelist is enabled, only files with the specified extensions will be processed. If the blacklist is enabled, files with the specified extensions will not be processed. If both are enabled, the blacklist will take priority over the whitelist.
For example, if you want to process only pdf files, you can set the whitelist to
{
"section": "FilePublisher",
"options": {
"whitelist": ["pdf", "pdfa"]
}
}
The file extensions are always compared in lowercase.
Example:
{
"section": "FilePublisher",
"options": {
"pipeline_version": "17.2"
}
}
### Publisher arguments
The publisher arguments are passed to the publisher services. The arguments are
grouped by the media type of the files. The available arguments are:
- **image**: `skip_compression` (default: `false`): If set to `true`, the image
will not be processed by the image publisher service. Only the thumbnail will
be created and uploaded.
**Example:**
```json
{
"section": "FilePublisher",
"options": {
"publisher_args": {
"image": {
"skip_compression": true
}
}
}
}
# Options
blacklist
- Blacklist- If set, file extensions that are included in the list are not processed.
- Type:
List
package_processing
- Package processing- If enabled, the section will process packages (e.g. AIPs). If disabled, package files are processed individually.
- Type:
Boolean
- Default:
false
pipeline_version
- Pipeline version- The version of the pipeline configuration. Files that were already processed with this specific version are skipped in future runs.
- Type:
Char
publisher_args
- Publisher args- Additional arguments that are passed to the publisher services.
- Type:
JSON
whitelist
- Whitelist- If set, only file extensions included in the list are processed.
- Type:
List
# FileSource
The FileSource
section allows to manually upload a file for processing.
The file could be an excel file that is read and processed by the ExcelReader
section.
The section is not meant to be used for automatic / nightly running, since the user has
to manually upload a file when the pipeline is executed.
The section yields items with paths:
[
{
"path": "/path/to/uploaded-file.xlsx"
}
]
# Options
# Filter
Filter items by property values.
The filter section allows to drop items from the pipeline based on basic value comparison. Simple comparison operators can be used as well as and, or and not.
All expressions are represented as a simple list.
Keep items with a truthy field value:
{"keep": ["$is_public"]}
Using comparison operators:
{"keep": ["$is_public", "==", True]}
{"keep": ["$is_public", "!=", False]}
{"keep": ["$size", ">=", 10]}
{"keep": ["$title", "is not", None]}
Using and / or expressions:
{"keep": ["or",
["$schutzfrist_in_vergangenheit"],
["and"
["$status", "==", "Abgeschlossen"],
["$auf_dem_portal_sichtbar", "==", "Schutzfrist ignorierend"]]]}
Using not:
{"keep": ["and",
["$schutzfrist_in_vergangenheit"],
["not", ["$status", "==", "Abgeschlossen"]]]
# Options
keep
- Keep conditions required- Contains a condition definition. All items for which the condition has a truthy result are kept in the pipeline. Items that do not meet the condition are dropped from the pipeline.
- Type:
JSON
# HTTP
The HTTP
source section enables you to perform an HTTP request to any API endpoint
and furnish the payload as a transient file for subsequent processing in a subsequent section.
Illustrative Example:
{
"section": "HTTP",
"options": {
"url": "https://example.com/api/items",
"params": {
"query": "Test"
}
}
}
This action will introduce an item into the pipeline in the subsequent format:
{
"path": "/tmp/etl/http/response-body-f34ds7fjlk",
"status_code": "200",
"reason": "OK",
"headers": {
"Content-Type": "application/json"
}
}
Following this, the next section can access the file from the path
and conduct suitable processing.
# Custom Request Headers
Custom request headers can be configured using the headers
option:
{
"section": "HTTP",
"options": {
"url": "https://example.com/api/items",
"headers": {
"Accept": "application/json"
}
}
}
# Basic Authentication
In situations where certain APIs mandate basic authentication, this can be accomplished
by setting up a list of a username and password within the auth
option:
{
"section": "HTTP",
"options": {
"url": "https://example.com/api/items",
"auth": ["api-user", "secret"]
}
}
# OAuth
The section supports the OAuth Backend Application flow to obtain an access token that
is used to access an API. This can be configured using the oauth
option:
{
"section": "HTTP",
"options": {
"url": "https://example.com/api/items",
"oauth": {
"client_id": "api-client-id",
"client_secret": "api-client-secret",
"token_url": "https://example.com/api/accesstoken"
}
}
}
# foreach
The section supports operating on previous items in the pipeline using the foreach
option.
It will call url
for each item yielded by the previous section and stores the result of the
request in target_path
of the item.
Expressions can be used to build the url
based on attributes of the item.
# Handling Response Codes
The section will trigger an exception when the response code falls outside the successful range, specifically when it is not a status code in the "2xx" range. In cases where the server responds with a server error (beginning with "5xx") or a client error (beginning with "4xx"), the section will terminate with an exception.
# Pagination Capability
The HTTP
section offers support for paginating the requested API endpoint,
provided that the response body is compatible.
To enable pagination, the responding API must specify the content-type
header as one of the supported content types (detailed below).
For each subsequent request, this section will generate and insert an item with a payload file path. It's important to note that this approach impacts the total number of requests in the pipeline run stats. This is because, in this scenario, it's impossible to predict the exact number of requests that will be made during pagination.
# Supported Content Types
In order to obtain the necessary information for pagination from the response body, the response body must be parsed and compatible with the supported content types. All pagination variants are compatible with these content types.
# JSON documents
When working with JSON payloads, you can select specific values using the JSON Pointer syntax. For more details, please refer to the JSON Pointer documentation (opens new window).
Content types:
application/json
application/ld+json
# XML documents
For XML documents, values can be extracted using XPath (opens new window).
Content types:
application/xml
text/xml
# Paginators
# Cursor-Based Paginator
The cursor_pagination
method operates by expecting a cursor value in the response,
which serves as a reference to the next page.
In this method, the cursor should accurately represent the next page to be fetched.
Here's an example payload structure:
{
"data": {
"programmePage": {
"cursor": "MSBnZWdlbiAxMDAgLSBTb21tZXJzcGVjaWFs",
"edges": [
{},
{},
]
}
}
To configure the paginator for this method, you can use the following settings:
{
"section": "HTTP",
"options": {
"url": "http://api.server.com/items",
"cursor_pagination": {
"cursor_path": "/data/programmePage/cursor",
"cursor_param": "cursor"
}
}
}
By specifying these configurations, the section will effectively manage pagination using the cursor values found in the response, ensuring the retrieval of subsequent pages of data.
# Next URL-Based Paginator
The next_pagination
method is based on extracting a fully qualified URL
from the response document to determine the URL for the next page.
For instance, if the requested API provides a payload with a "next" URL like this:
{
"pagination": {
"next": "http://api.server.com/items?page=2"
},
"results": [{"item": 1}]
}
You can extract the next URL using a JSON Pointer expression, as demonstrated in the following configuration:
{
"section": "HTTP",
"options": {
"url": "http://api.server.com/items",
"next_pagination": {
"next_url_path": "/pagination/next"
}
}
}
This approach allows the section to dynamically determine and follow the next page URL provided in the response, facilitating effective pagination.
# Offset-Based Paginator
The offset_pagination
method facilitates pagination based on a specified starting index,
referred to as the offset. In a scenario where a page contains, for instance, 50
items,
the initial request would use a start
parameter set to 0
,
and subsequent requests would increment the start value to 50
,
or alternatively, based on the API's indexing system, potentially 51
.
To make this approach work effectively, the requested API must provide the following values in the response:
offset
: Denoting the index of the first element in the current page.limit
: Indicating the maximum number of items on the current page.total
: Representing the total number of items across all pages.
The section performing the pagination will cease when any of these values is missing,
or when the calculated offset
for the next page exceeds the total
items available.
Here is an example of a payload structure:
<root start="51" page_size="50" total="237">
<items>
...
</items>
</root>
To configure the paginator for this method, you can use the following settings:
{
"section": "HTTP",
"options": {
"url": "http://api.server.com/items",
"offset_pagination": {
"offset_path": "/root/@start",
"offset_param": "start",
"limit_path": "/root/@page_size",
"limit_param": "size",
"total_path": "/root/@total"
}
}
}
This configuration allows the section to manage pagination effectively based on the offset values provided in the response, ensuring the retrieval of all relevant data.
# Page-Based Paginator
The page_pagination
method operates by simply incrementing a page number in
the request parameter, assuming that the first page is numbered as 1
.
To determine when to stop, the configuration should include the items_path
,
which enables the paginator to identify whether items have been returned in the response or not.
Here's an example of a payload structure:
{
"data": [
{},
{}
]
}
To configure the paginator for this method, you can use the following settings:
{
"section": "HTTP",
"options": {
"url": "http://api.server.com/items",
"page_pagination": {
"page_param": "page",
"items_path": "/data"
}
}
}
With these configurations, the section will effectively manage pagination by incrementing the page number and checking for the presence of items in the response, ensuring the retrieval of subsequent pages of data.
# Options
url
- URL required- The URL which should be requested.
- Type:
Expression
auth
- Authentication- List of username and password in plaintext, submitted as basic authentication. Environment variables can be referenced using
$env.
Example:$env.EXAMPLE_SECRET
uses the value of the environment variableETL_ENV_EXAMPLE_SECRET
. - Type:
List
- Default:
null
- List of username and password in plaintext, submitted as basic authentication. Environment variables can be referenced using
cleanup
- Cleanup files- When enabled, the temporary payload file is automatically removed after passing on the item in the pipeline.
- Type:
Boolean
- Default:
true
cookies
- Cookies- Dict of cookies to send with the request.
- Type:
Dict
- Default:
null
cursor_pagination
- Cursor pagination- Type:
Dict
- Default:
null
cursor_param
- cursor query param name required- The query param name to use for the cursor value in the next request.
- Type:
Char
cursor_path
- Path to the cursor value required- The cursor value must contain the cursor for the current page.
- Type:
Char
- Type:
foreach
- Foreach- Type:
Dict
target_path
- Path to store the result required- Path to the key on the item in the pipeline where the result will be stored.
""
can be used to replace the entire item. - Type:
Char
- Path to the key on the item in the pipeline where the result will be stored.
- Type:
headers
- Headers- Request headers to be sent.
- Type:
Dict
- Default:
null
json
- JSON Payload- A JSON serializable Python object to send in the body of the Request.
- Type:
JSON
- Default:
null
method
- HTTP Method- The HTTP method with which the requests should be made.
- Type:
Choice
- Default:
'GET'
- Choices:
GET
: GET
next_pagination
- Next pagination- Type:
Dict
- Default:
null
next_url_path
- Next URL path required- Path to the URL of the next page in the response document.
- Type:
Char
- Type:
oauth
- Oauth- Type:
Dict
client_id
- Client ID required- Environment variables can be referenced using
$env.
Example:$env.EXAMPLE_SECRET
uses the value of the environment variableETL_ENV_EXAMPLE_SECRET
. - Type:
EnvChar
- Environment variables can be referenced using
client_secret
- Client Secret required- Environment variables can be referenced using
$env.
Example:$env.EXAMPLE_SECRET
uses the value of the environment variableETL_ENV_EXAMPLE_SECRET
. - Type:
EnvChar
- Environment variables can be referenced using
token_url
- URL used to get an access token required- Type:
Char
- Type:
- Type:
offset_pagination
- Offset pagination- Type:
Dict
- Default:
null
limit_path
- Path to the limit value required- The limit value must contain the maximum amount of items on one page.
- Type:
Char
offset_param
- Offset query param name required- The query param name to use for the offset value in the next request.
- Type:
Char
offset_path
- Path to the offset value required- The offset value must contain the index of the first element of the current page.
- Type:
Char
total_path
- Path to the total value required- The offset value must contain total amount of items in all pages.
- Type:
Char
limit_param
- Limit query param name- The query param name with which the limit is sent in the next request. This is optional. When
null
, the query param is omitted. - Type:
Char
- Default:
null
- The query param name with which the limit is sent in the next request. This is optional. When
- Type:
page_pagination
- Page pagination- Type:
Dict
- Default:
null
items_path
- path to the items required- When there there are no items, the pagination is stopped.
- Type:
Char
page_param
- page query param name required- The query param name to use for the page value in the next request.
- Type:
Char
- Type:
params
- Query Params- Request params to be sent in the query string.
- Type:
Dict
- Default:
null
timeout
- Timeout- How many seconds to wait for the server to send data before giving up as a float.
- Type:
Float
- Default:
null
verify
- Verify TLS certificate- Control whether we verify the server's TLS certificate.
- Type:
Boolean
- Default:
true
# JSONReader
The JSONReader
source section allows the read JSON files and
yield the JSON items as a list.
This section expects the previous item to contain the path to a JSON file (by default under
/path
).
Example:
[
{
"path": "/path/to/file.json"
}
]
Let's say we have a JSON file with the following structure:
{
"groupSet": [
{
"offers": [
{"name": "Offers 1"},
{"name": "Offers 2"}
]
},
{
"offers": [
{"name": "Offers 3"},
{"name": "Offers 4"}
]
}
]
}
We can use the JSONReader
section to read the offers
items from the first element of the groupSet
array:
{
"section": "JSONReader",
"options": {
"items": "/groupSet/0/offers"
}
}
JSON Pointer syntax can be used to specify the path to data in a JSON file,
which is a string of tokens separated by /
.
For getting more information about the JSON Pointer syntax,
please refer to the JSON Pointer documentation (opens new window).
Invalid data paths will result in an empty list being returned. It will always return a list, even if the data path points to a dictionary.
# Options
items
- Items- Path to the items in the JSON file
- Type:
Char
- Default:
''
path
- Path- Path to the key on the item containing the path to the JSON file
- Type:
Char
- Default:
'/path'
# JSONWriter
The JSONWriter section consumes items and writes them to a JSON-file as a list of items. The path to the file is yielded to the next section which may process it further.
In detail, the section writes all items consumed from the prior section into files on the temp file system of the ETL worker. It does not yield the consumed items to the next section but it yields only one item per written file, which contains the path to the file on the disk.
The written files are automatically removed after they are processed by the next section in order to keep disk usage low.
The items are written in batches (configurable through batch_size
). The filename
is also configurable and acts as a template, where these values are replaced:
$batch
- the batch number, starting with1
.$date
- the date when the pipeline was executed in the formYYYMMMDD
.$time
- the date and time in formYYYYMMDD-hhmmss
; the date and time is the same for all written batch files for convenience, thus it does not resemble the time the file was written but the time the pipeline run was started.
Example configuration:
{
"section": "JSONWriter",
"options": {
"batch_size": 200,
"filename": "data-$date-$batch.json"
}
},
Example output with two batches:
[
{"path": "/tmp/etl/json_writer/data-20221229-1.json", "size": 200},
{"path": "/tmp/etl/json_writer/data-20221229-2.json", "size": 173},
]
It is also possible to use simple item properties in the filename
template.
This has the effect that the items are grouped into multiple JSON output files.
Example configuration:
{
"section": "JSONWriter",
"options": {
"batch_size": 200,
"filename": "data-${archival_type}-${batch}.json"
}
},
Only simple item properties such as strings, integers and booleans can be used in the filenames. Nested data structures such as lists and dicts are not supported.
WARNING
Be aware that when batch_size
set to a non-zero value and $batch
is missing in
the filename
template, the section will yield the same file path / filename multiple
times with multiple batches of items. Depending on what is done with the files, the
file may be overwritten on the target system and may result in missing data.
# Options
batch_size
- Batch size- Amount of objects to be written in one file. Set it to 0 in order to write all items into one file.
- Type:
Integer
- Default:
1000
filename
- Filename- Filename template of the JSON file.
- Type:
Char
- Default:
'data-$batch.json'
pretty
- Pretty print- Pretty print items on multiple lines.
- Type:
Boolean
- Default:
false
# Logger
The logger section is used for logging data to the stdout
of the worker process.
This is useful for inspecting the behavior of a pipeline.
The logger-section re-yields the all consumed items to the next section.
Example:
{
"section": "Logger",
"options": {
"prefix": "Published",
"fields": [
"ID_NR_DATEI",
"public_path"
],
"oneline": true
}
}
# Options
fields
- List of field names- A list of fields to be logged for each item. This helps to reduce the amount of output for large items.
- Type:
List
- Default:
null
limit
- Limit- Only log the first n items consumed in order to not fill the log.
- Type:
Integer
- Default:
100
oneline
- One line- Flag for disabling pretty-printing items on multiple lines. Especially useful in combination with
fields
in order to make the output more compact. - Type:
Boolean
- Default:
false
- Flag for disabling pretty-printing items on multiple lines. Especially useful in combination with
prefix
- Prefix- Prefix for each log entry in order to identify which logger has produced the output. This is helpful when combining multiple loggers in the same pipeline.
- Type:
Char
- Default:
'Logger'
# Map
The map section applies functions defined as operations on all items in the pipeline and acts as the "transform" part of the ETL pipeline.
The map section accepts a list of operations, where each operation consists of a dict with these keys:
function
: string with the name of the function to applyargs
: list of arguments for the function; string values with a$
prefix are replaced by the item-value identified by the name following the$
. Nested values inside a dict can be accessed with dot notation. e.g.:$field.nested_key.double_nested_key
. This notation also supports selecting list elements, for instance with$list.0
or$list.-1
.kwargs
: optional list of keyword arguments for the function.target
: the name of the variable, in which the output is stored within the item.
There are many functions that can be used within the Map section.
One special function is the condition
function, which makes it
possible to perform a condition on the item and store the boolean
result in the item.
Examples:
In the next example, the string function split
is applied on the
value of the field ARCHIVALIENART
in order to split the string by
newline into a list of strings.
{
"section": "Map",
"options": {
"operations": [
{
"args": [
"$ARCHIVALIENART",
"\r\n\r\n"
],
"target": "archival_type",
"function": "split"
}
]
}
}
In the next example, we are creating a datetime string and store it
in the field _today
on the item. The string can later be used for
comparing dates.
{
"section": "Map",
"options": {
"operations": [
{
"target": "_today",
"function": "now"
},
{
"args": [
"$_today",
"%Y-%m-%d"
],
"target": "_today",
"function": "strftime"
}
]
}
}
In the next example, the Map
section is used to compare the datestring SCHUTZFRISTENDE
to the current datestring stored in _today
.
{
"section": "Map",
"options": {
"operations": [
{
"args": [
"and",
[
"$SCHUTZFRISTENDE",
"is not",
null
],
[
"$SCHUTZFRISTENDE",
"<",
"$_today"
]
],
"target": "is_approval_required",
"function": "condition"
}
]
}
}
# Map functions
Below, the functions usable in the Map
section are listed and documented.
Most functions are simply Python functions that are registered in the Map
section, thus the documentation is extracted from the standard Python
function and may be adapted to how the Map
section functions when used.
# abs
Return the absolute value of the argument.
# all
Return True if bool(x) is True for all values x in the iterable.
If the iterable is empty, return True.
# any
Return True if bool(x) is True for any x in the iterable.
If the iterable is empty, return False.
# ascii
Return an ASCII-only representation of an object.
As repr(), return a string containing a printable representation of an object, but escape the non-ASCII characters in the string returned by repr() using \x, \u or \U escapes. This generates a string similar to that returned by repr() in Python 2.
# bool
bool(x) -> bool
Returns True when the argument x is true, False otherwise. The builtins True and False are the only two instances of the class bool. The class bool is a subclass of the class int, and cannot be subclassed.
# bytes
bytes(iterable_of_ints) -> bytes bytes(string, encoding[, errors]) -> bytes bytes(bytes_or_buffer) -> immutable copy of bytes_or_buffer bytes(int) -> bytes object of size given by the parameter initialized with null bytes bytes() -> empty bytes object
Construct an immutable array of bytes from:
- an iterable yielding integers in range(256)
- a text string encoded using the specified encoding
- any object implementing the buffer API.
- an integer
# capitalize
Return a capitalized version of the string.
More specifically, make the first character have upper case and the rest lower case.
# chr
Return a Unicode string of one character with ordinal i; 0 <= i <= 0x10ffff.
# condition
Apply a condition and store the boolean result.
The condition function accepts a condition in the same way the filter conditions
can be defined in the filter section. The difference is that the result can be
stored as boolean in a field when used in the Map
section.
See also the separate chapter on conditions in the ETL pipeline.
Example:
{
"section": "Map",
"options": {
"operations": [
{
"args": [
"$ZUGAENGLICHKEIT_ID",
"in",
[10007, 10009]
],
"target": "files_are_public",
"function": "condition"
}
]
}
}
# count
S.count(sub[, start[, end]]) -> int
Return the number of non-overlapping occurrences of substring sub in string S[start:end]. Optional arguments start and end are interpreted as in slice notation.
# date_to_int
The function takes a date string that can be parsed into a date (using dateparse.parse_date
)
and converts it into an integer value.
Eg 2022-06-19
will be converted to the integer 20220619
.
Examples:
{
"args": [
"2022-06-19",
],
"target": "output",
"function": "date_to_int",
},
{
"args": [
"$field",
],
"target": "other_output",
"function": "date_to_int",
}
# datetime
datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]])
The year, month and day arguments are required. tzinfo may be None, or an instance of a tzinfo subclass. The remaining arguments may be ints.
# dict
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
# endswith
S.endswith(suffix[, start[, end]]) -> bool
Return True if S ends with the specified suffix, False otherwise. With optional start, test S beginning at that position. With optional end, stop comparing S at that position. suffix can also be a tuple of strings to try.
# enumerate
Return an enumerate object.
iterable an object supporting iteration
The enumerate object yields pairs containing a count (from start, which defaults to zero) and a value yielded by the iterable argument.
enumerate is useful for obtaining an indexed list: (0, seq[0]), (1, seq[1]), (2, seq[2]), ...
# find
S.find(sub[, start[, end]]) -> int
Return the lowest index in S where substring sub is found, such that sub is contained within S[start:end]. Optional arguments start and end are interpreted as in slice notation.
Return -1 on failure.
# float
Convert a string or number to a floating point number, if possible.
# format
S.format(*args, **kwargs) -> str
Return a formatted version of S, using substitutions from args and kwargs. The substitutions are identified by braces ('{' and '}').
# format_map
S.format_map(mapping) -> str
Return a formatted version of S, using substitutions from mapping. The substitutions are identified by braces ('{' and '}').
# generate_identifier
The function takes a string and returns a hash value of the string. The hash value is truncated to 32 characters. Mainly used for generating an identifier for a string. This will always generate the same hash value for the same string.
# hash
Return the hash value for the given object.
Two objects that compare equal must also have the same hash value, but the reverse is not necessarily true.
# hex
Return the hexadecimal representation of an integer.
hex(12648430) '0xc0ffee'
# index
S.index(sub[, start[, end]]) -> int
Return the lowest index in S where substring sub is found, such that sub is contained within S[start:end]. Optional arguments start and end are interpreted as in slice notation.
Raises ValueError when the substring is not found.
# int
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.
int('0b100', base=0) 4
# isalnum
Return True if the string is an alpha-numeric string, False otherwise.
A string is alpha-numeric if all characters in the string are alpha-numeric and there is at least one character in the string.
# isalpha
Return True if the string is an alphabetic string, False otherwise.
A string is alphabetic if all characters in the string are alphabetic and there is at least one character in the string.
# isascii
Return True if all characters in the string are ASCII, False otherwise.
ASCII characters have code points in the range U+0000-U+007F. Empty string is ASCII too.
# isdecimal
Return True if the string is a decimal string, False otherwise.
A string is a decimal string if all characters in the string are decimal and there is at least one character in the string.
# isdigit
Return True if the string is a digit string, False otherwise.
A string is a digit string if all characters in the string are digits and there is at least one character in the string.
# isidentifier
Return True if the string is a valid Python identifier, False otherwise.
Call keyword.iskeyword(s) to test whether string s is a reserved identifier, such as "def" or "class".
# islower
Return True if the string is a lowercase string, False otherwise.
A string is lowercase if all cased characters in the string are lowercase and there is at least one cased character in the string.
# isnumeric
Return True if the string is a numeric string, False otherwise.
A string is numeric if all characters in the string are numeric and there is at least one character in the string.
# isprintable
Return True if the string is printable, False otherwise.
A string is printable if all of its characters are considered printable in repr() or if it is empty.
# isspace
Return True if the string is a whitespace string, False otherwise.
A string is whitespace if all characters in the string are whitespace and there is at least one character in the string.
# istitle
Return True if the string is a title-cased string, False otherwise.
In a title-cased string, upper- and title-case characters may only follow uncased characters and lowercase characters only cased ones.
# isupper
Return True if the string is an uppercase string, False otherwise.
A string is uppercase if all cased characters in the string are uppercase and there is at least one cased character in the string.
# join
Join a list of items as strings into one string. The following kwargs
can be used:
separator
: The separator used to join the items. Defaults to-
.flatten
: A boolean indicating whether the list should be flattened before joining. Defaults tofalse
.
Example:
{
"section": "Map",
"options": {
"operations": [
{
"function": "join",
"args": ["$record_id", "$descriptor_id"],
"kwargs": {"separator": "-"},
"target": "identifier"
}
]
}
}
# len
Return the number of items in a container.
# list
Built-in mutable sequence.
If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.
# ljust
Return a left-justified string of length width.
Padding is done using the specified fill character (default is a space).
# lower
Return a copy of the string converted to lowercase.
# lstrip
Return a copy of the string with leading whitespace removed.
If chars is given and not None, remove characters in chars instead.
# max
max(iterable, *[, default=obj, key=func]) -> value max(arg1, arg2, *args, *[, key=func]) -> value
With a single iterable argument, return its biggest item. The default keyword-only argument specifies an object to return if the provided iterable is empty. With two or more arguments, return the largest argument.
# min
min(iterable, *[, default=obj, key=func]) -> value min(arg1, arg2, *args, *[, key=func]) -> value
With a single iterable argument, return its smallest item. The default keyword-only argument specifies an object to return if the provided iterable is empty. With two or more arguments, return the smallest argument.
# now
Returns new datetime object representing current time local to tz.
tz Timezone object.
If no tz is specified, uses local timezone.
# nullif
Returns None
if the condition is true. Otherwise, the value is returned.
See also the separate chapter on conditions in the ETL pipeline.
Examples:
{
"args": [
"$input",
["$input", "==", "unknown"],
],
"target": "output",
"function": "nullif",
}
{
"args": [
"$input",
["and", ["$input", "is not", None], ["$input", "<", "0001-01-01"]],
],
"target": "output",
"function": "nullif",
}
# oct
Return the octal representation of an integer.
oct(342391) '0o1234567'
# ord
Return the Unicode code point for a one-character string.
# parse_datetime
Parse a string and return a datetime.datetime.
This function supports time zone offsets. When the input contains one,
the output uses a timezone with a fixed offset from UTC.
Raise ValueError if the input is well formatted but not a valid datetime.
Return None if the input isn't well formatted.
# partition
Partition the string into three parts using the given separator.
This will search for the separator in the string. If the separator is found, returns a 3-tuple containing the part before the separator, the separator itself, and the part after it.
If the separator is not found, returns a 3-tuple containing the original string and two empty strings.
# pick
When provided a list of keys, it picks all corresponding values in a list of dicts
Example 1:
{
"section": "Map",
"options": {
"operations": [
{
"args": ["$people", "name"],
"target": "list_of_names",
"function": "pick"
}
]
}
}
item:
{
"people": [{"name": "Hans"}, {"name": "Peter"}]
}
returns:
{
"people": [{"name": "Hans"}, {"name": "Peter"}],
"list_of_names": ["Hans","Peter"]
}
Example 2:
{
"section": "Map",
"options": {
"operations": [
{
"args": ["$people", "name", "age"],
"target": "list_of_names",
"function": "pick"
}
]
}
}
item:
{
"people": [{"name": "Hans", "age": 20}, {"name": "Peter", "age": 21}]
}
returns:
{
"people": [{"name": "Hans", "age": 20}, {"name": "Peter", "age": 21}],
"list_of_names": [("Hans", 20),("Peter", 21)]
}
# pow
Equivalent to baseexp with 2 arguments or baseexp % mod with 3 arguments
Some types, such as ints, are able to use a more efficient algorithm when invoked using the three argument form.
# range
range(stop) -> range object range(start, stop[, step]) -> range object
Return an object that produces a sequence of integers from start (inclusive) to stop (exclusive) by step. range(i, j) produces i, i+1, i+2, ..., j-1. start defaults to 0, and stop is omitted! range(4) produces 0, 1, 2, 3. These are exactly the valid indices for a list of 4 elements. When step is given, it specifies the increment (or decrement).
# removeprefix
Return a str with the given prefix string removed if present.
If the string starts with the prefix string, return string[len(prefix):]. Otherwise, return a copy of the original string.
# removesuffix
Return a str with the given suffix string removed if present.
If the string ends with the suffix string and that suffix is not empty, return string[:-len(suffix)]. Otherwise, return a copy of the original string.
# replace
Return a copy with all occurrences of substring old replaced by new.
count Maximum number of occurrences to replace. -1 (the default value) means replace all occurrences.
If the optional argument count is given, only the first count occurrences are replaced.
# reversed
Return a reverse iterator over the values of the given sequence.
# rfind
S.rfind(sub[, start[, end]]) -> int
Return the highest index in S where substring sub is found, such that sub is contained within S[start:end]. Optional arguments start and end are interpreted as in slice notation.
Return -1 on failure.
# rindex
S.rindex(sub[, start[, end]]) -> int
Return the highest index in S where substring sub is found, such that sub is contained within S[start:end]. Optional arguments start and end are interpreted as in slice notation.
Raises ValueError when the substring is not found.
# rjust
Return a right-justified string of length width.
Padding is done using the specified fill character (default is a space).
# round
Round a number to a given precision in decimal digits.
The return value is an integer if ndigits is omitted or None. Otherwise the return value has the same type as the number. ndigits may be negative.
# rpartition
Partition the string into three parts using the given separator.
This will search for the separator in the string, starting at the end. If the separator is found, returns a 3-tuple containing the part before the separator, the separator itself, and the part after it.
If the separator is not found, returns a 3-tuple containing two empty strings and the original string.
# rstrip
Return a copy of the string with trailing whitespace removed.
If chars is given and not None, remove characters in chars instead.
# set
set() -> new empty set object set(iterable) -> new set object
Build an unordered collection of unique elements.
# setif
The function sets the value in the target to a specific value when the condition is true.
The first argument is the resulting value (which may contain
the value of another field when prefixed with $
).
The second argument is a condition. See the separate chapter
on conditions in the documentation.
When the condition resolves to false, the target field is not
updated. But if the field does not yet exist, it is automatically
set to None
in order to have a consistent set of fields over all
items in the pipeline.
Examples:
{
"args": [
"A value",
["$input", "==", "A"],
],
"target": "output",
"function": "setif",
},
{
"args": [
"$otherField",
["$input", "!=" "A"],
],
"target": "output",
"function": "setif",
}
# setvalue
The function simply sets the value in the target to a specific value.
The value argument is the resulting value (which may contain
the value of another field when prefixed with $
).
Examples:
{
"args": [
"some value",
],
"target": "some_output",
"function": "setvalue",
},
{
"args": [
"$otherField",
],
"target": "other_output",
"function": "setvalue",
}
# slice
slice(stop) slice(start, stop[, step])
Create a slice object. This is used for extended slicing (e.g. a[0:10:2]).
# sorted
Return a new list containing all items from the iterable in ascending order.
A custom key function can be supplied to customize the sort order, and the reverse flag can be set to request the result in descending order.
# split
A more robust implementation of Python's str.split
. If an exception is raised (e.g. when
trying to split None
), an empty list is returned.
# splitlines
Return a list of the lines in the string, breaking at line boundaries.
Line breaks are not included in the resulting list unless keepends is given and true.
# startswith
S.startswith(prefix[, start[, end]]) -> bool
Return True if S starts with the specified prefix, False otherwise. With optional start, test S beginning at that position. With optional end, stop comparing S at that position. prefix can also be a tuple of strings to try.
# str
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
# strftime
format -> strftime() style string.
# strip
Return a copy of the string with leading and trailing whitespace removed.
If chars is given and not None, remove characters in chars instead.
# strptime
string, format -> new datetime parsed from a string (like time.strptime()).
# sum
Return the sum of a 'start' value (default: 0) plus an iterable of numbers
When the iterable is empty, return the start value. This function is intended specifically for use with numeric values and may reject non-numeric types.
# swapcase
Convert uppercase characters to lowercase and lowercase characters to uppercase.
# timedelta
Difference between two datetime values.
timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)
All arguments are optional and default to 0. Arguments may be integers or floats, and may be positive or negative.
# title
Return a version of the string where each word is titlecased.
More specifically, words start with uppercased characters and all remaining cased characters have lower case.
# tuple
Built-in immutable sequence.
If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable's items.
If the argument is a tuple, the return value is the same object.
# upper
Return a copy of the string converted to uppercase.
# zfill
Pad a numeric string with zeros on the left, to fill a field of the given width.
The string is never truncated.
# zip
The zip
function is used for combining multiple lists into one list.
Given I have some lists of data in an item.
{
"ids": [7, 5, 9],
"names": ["foo", "bar", "baz"]
}
with the zip
function I can pair them together:
"operations": [
{
"function": "zip",
"args": [["$ids", "$names"]],
"target": "result"
}
]
and then get a combined list:
{
"result": [[7, "foo"], [5, "bar"], [9, "baz"]],
...
}
and I can also do the inverse operation with the same function by reducing a list level in the input:
"operations": [
{
"function": "zip",
"args": ["$result"],
"target": "result2"
}
]
and get:
{
"result2": [[7, 5, 9], ["foo", "bar", "baz"]],
...
}
# Options
operations
- Operationen required- A list of operations, each consisting of at least a
function
,args
and atarget
. See section documentation for details on how to define operations. - Type:
List
- A list of operations, each consisting of at least a
# MissingSourceItems
Takes a list of objects representing the source objects
expecting a key named identifier_field
for every object.
Yields a list of objects which are missing in the source but present
in the system by comparing identifier_field
in the source
with identifier_field
in the system. Yields the list of missing items.
# Options
model
- Mode name required- Name of the model in lowercase.
- Type:
Choice
- Choices:
record
: Datensatzrecordrelation
: Verknüpfung von Verzeichnungseinheitenfile
: Filesdescriptor
: Deskriptordescriptorrelation
: Verknüpfung von Deskriptorendescriptorrecordrelation
: Verknüpfung von Deskriptoren und Verzeichnungseinheitendelivery
: Ablieferungdeliveryrecordrelation
: Verknüpfung von Ablieferungen und Verzeichnungseinheitencontainer
: Behältniscontainerrecordrelation
: Verknüpfung von Behältnissen und Verzeichnungseinheitenenumentry
: Enum Wertpackage
: Package
fields
- Fields- A list of field names to yield.
- Type:
List
- Default:
['identifier']
filter
- Filter- A JSON object containing the filters to apply.
- Type:
Dict
identifier_field
- Identifier field name- The name of the filed on the item containing the identifier.
- Type:
Char
- Default:
'ID_NR'
lookup_field
- Lookup field name- Name of the field on the
model
used to look up the values fromidentifier_field
. - Type:
Char
- Default:
'identifier'
- Name of the field on the
# MyColexSource
This section fetches data from MyColex (opens new window).
Use endpoint
to specify which data to fetch.
To authenticate against the MyColex API, an API Token needs to be provided or set as an
environment variable (MYCOLEX_API_TOKEN
).
Example:
{
"section": "MyColexSource",
"options": {
"base_url": "https://mycolex.example.org",
"endpoint": "inventory",
"page_size": 100
}
}
Example:
{
"section": "QuerysetSource",
"options": {
"model": "record",
"filter": {
"is_public": true,
"files_are_public": true
}
}
},
{
"section": "MyColexSource",
"options": {
"base_url": "https://mycolex.example.org",
"endpoint": "image",
"page_size": 100,
"filter": {
"published": true
},
"filter_by_previous_in": {
"previous_field": "identifier",
"filter_field": "objects"
}
}
}
# Options
endpoint
- Endpoint required- Which API endpoint to query data from (e.g.
inventory
,building
,event
, etc.). - Type:
Char
- Which API endpoint to query data from (e.g.
api_token
- API Token- The token used to authenticate in MyColex. This is usually configured as environment variable and can thus be omitted.
- Type:
Char
base_url
- URL- The URL to MyColex. This is usually configured as environment variable and can thus be omitted.
- Type:
Char
filter
- Filter- Filter parameters passed to the endpoint.
- Type:
JSON
filter_by_previous_in
- Filter requested data by items in the pipeline- The
filter_by_previous_in
option allows to select rows that have a relation to items consumed by previous sections in the pipeline. When this field is configured, it must contain an object with two configurations, thefilter_field
name, where the relation is stored in the source system and theprevious_field
, which is the name of the field where the foreign identifier is stored in the consumed items. See the example for reference. - Type:
Dict
filter_field
- Filter field name required- Name of the field in MyColex.
- Type:
Char
previous_field
- Previous field name required- Name of the field in the pipeline item.
- Type:
Char
- The
page_size
- Elements per request- Amount of items that are loaded in one page. Usually the default is fine.
- Type:
Integer
- Min:
1
- Max:
1000
# Publish
The publish section publishes objects of various models from the internal database to the external database.
In the internal database we have three groups of models:
- Models that are stored in a regular Postgres database in the internal and external
system with the same model. They can be published with this
Publish
section. Records
are stored in Postgres in internal, but are only stored in Solr in the external system. They cannot be published with thePublish
section, but theSolr
section which designed for this exact case.- Models that are never published.
In the Publish
section, we are publishing objects of models of type 1, expecting to
have a 1:1 equivalent in the external.
Examples:
{
"section": "Publish",
"options": {
"model": "file"
}
}
{
"section": "Publish",
"options": {
"model": "descriptorrecordrelation"
}
}
# Options
model
- Model name required- Name of the model to be published, with all lowercase characters.
- Type:
Choice
- Choices:
record
: Datensatzrecordrelation
: Verknüpfung von Verzeichnungseinheitenfile
: Filesdescriptor
: Deskriptordescriptorrelation
: Verknüpfung von Deskriptorendescriptorrecordrelation
: Verknüpfung von Deskriptoren und Verzeichnungseinheitendelivery
: Ablieferungdeliveryrecordrelation
: Verknüpfung von Ablieferungen und Verzeichnungseinheitencontainer
: Behältniscontainerrecordrelation
: Verknüpfung von Behältnissen und Verzeichnungseinheitenenumentry
: Enum Wertpackage
: Package
batch_size
- Batch size- Amount of objects to be published in one batch.
- Type:
Integer
- Default:
200
prune
- Prune- Delete all objects of this model from the external database before starting to publish. Use with caution and only when we really have to replace all data.
- Type:
Boolean
- Default:
false
# QuerysetSource
The queryset source section loads objects from the database and yields the objects into the pipeline. It is possible to apply simple queries by using "filter" and "exclude" functions of the Django ORM.
This can be used in combination with transform sections when data must be changed within the database.
Example:
{
"section": "QuerysetSource",
"options": {
"model": "record",
"filter": {
"is_public": true,
"files_are_public": true
}
}
}
# Options
model
- Model name required- Name of the model from which we want to load data.
- Type:
Choice
- Choices:
record
: Datensatzrecordrelation
: Verknüpfung von Verzeichnungseinheitenfile
: Filesdescriptor
: Deskriptordescriptorrelation
: Verknüpfung von Deskriptorendescriptorrecordrelation
: Verknüpfung von Deskriptoren und Verzeichnungseinheitendelivery
: Ablieferungdeliveryrecordrelation
: Verknüpfung von Ablieferungen und Verzeichnungseinheitencontainer
: Behältniscontainerrecordrelation
: Verknüpfung von Behältnissen und Verzeichnungseinheitenenumentry
: Enum Wertpackage
: Package
batch_size
- Batch size- The number of objects that are loaded from the database at once.
- Type:
Integer
- Default:
100
distinct
- Distinct- If checked, only distinct objects are yielded into the pipeline.
- Type:
Boolean
exclude
- Exclude- Objects passing this condition are not yielded into the pipeline. The condition is defined in a Django ORM compatible way.
- Type:
Dict
filter
- Filter- Objects passing this condition are yielded into the pipeline. The condition is defined in a Django ORM compatible way.
- Type:
Dict
# RawSource
The raw source section allows to inject raw data into the pipeline. This can be helpful for testing pipelines when you want to work with a specific set of items.
Example:
{
"section": "RawSource",
"options": {
"data": [
{"key": "foo"},
{"key": "bar"}
]
}
}
# Options
data
- Data required- Raw list of items to feed into the pipeline.
- Type:
JSON
# ResolveForeignKey
Resolves foreign keys for a given model and pops the result on the item.
The ResolveForeignKey
section is used when having a foreign key identifier
but we need the internal primary key of the object. The section looks up the
object by the identifier
field and replaces it with the pk
so that later
sections, such as the create-sections, can process the data.
When we cannot find an object with this identifier in our database, the behavior
can be configured in the missing
option:
raise
(default): raise an exception and stop the pipeline execution. This can be used when we do not expect that it is missing and need to investigate such cases.drop
: remove the whole item from the pipeline and do not further process it. This can be used when importing n:n relations where both ends must exist.nullify
: set the value tonull
. This can be used when importing 1:1 relations which may or may not have the target imported as well.
Example:
{
"section": "ResolveForeignKey",
"options": {
"model": "descriptor",
"lookup": "identifier",
"source": "ID_NR_1",
"target": "from_descriptor_id",
"missing": "drop"
}
},
{
"section": "ResolveForeignKey",
"options": {
"model": "descriptor",
"lookup": "identifier",
"source": "ID_NR_2",
"target": "to_descriptor_id",
"missing": "drop"
}
},
# Options
lookup
- Model field required- Name of the field on the model containing the identifier. Usually
identifier
. - Type:
Char
- Name of the field on the model containing the identifier. Usually
model
- Model name required- Name of the model where the lookup should be executed.
- Type:
Choice
- Choices:
record
: Datensatzrecordrelation
: Verknüpfung von Verzeichnungseinheitenfile
: Filesdescriptor
: Deskriptordescriptorrelation
: Verknüpfung von Deskriptorendescriptorrecordrelation
: Verknüpfung von Deskriptoren und Verzeichnungseinheitendelivery
: Ablieferungdeliveryrecordrelation
: Verknüpfung von Ablieferungen und Verzeichnungseinheitencontainer
: Behältniscontainerrecordrelation
: Verknüpfung von Behältnissen und Verzeichnungseinheitenenumentry
: Enum Wertpackage
: Package
source
- Source field required- Name of the field in the source item to get the identifier from.
- Type:
Char
target
- Target field required- Name of the field where the primary key should be stored in.
- Type:
Char
missing
- Missing behavior- Behavior, when the object is not found.
- Type:
Choice
- Default:
'raise'
- Choices:
raise
: Abort the pipeline run with an exception (default)drop
: Remove the item from the pipelinenullify
: Set the value tonull
# S3Uploader
The S3Uploader
section uploads single files to an S3 bucket.
The section consumes items with a path
to a temporary file (for instance produced by the JSONWriter
section),
reads the files from the temporary file system and uploads each file to an S3 bucket.
S3 configuration:
The uploader section uses preconfigured, named S3 targets. The reason is that the credentials (secrets) cannot be
configured / exposed in the pipeline because of security reasons.
The available S3 targets are configured through the enviornment variable S3_CONFIGS
, where each preconfigured
target has a name. The section needs to know the name of the target, therefore this has to be configured with
the s3_name
option.
Access / ACL:
By default, the uploaded files are not publicly / anonymously accessible, because the default ACL setting is
private
. In order to make it available to everyone anonymously, the acl
option can be set to public-read
.
Pruning:
The prune
option lets the section remove older files (configurable through keep_hours
), so that the S3 bucket
is not filled up when configuring the pipeline to run automatically / nightly.
Be aware that for the prune
option to work, a path
needs to be configured, so that the files are uploaded to
a folder instead of the bucket root. This is in order to prevent from accidentally deleting a whole bucket.
Full example:
Input:
[
{"path": "/tmp/etl/json_writer/data.json"}
]
Section configuration:
{
"section": "S3Uploader",
"options": {
"s3_name": "json-dump",
"path": "enums",
"delete": true,
"acl": "private",
"prune": true,
"keep_hours": 120
}
}
# Options
s3_name
- S3 config name required- The name of the preconfigured S3 configuration.
- Type:
Char
acl
- S3 ACL- S3 ACL setting, determining whether the file can be downloaded publicly or not.
- Type:
Choice
- Default:
'private'
- Choices:
private
: privatepublic-read
: public-read
delete
- Delete- Delete the local file after uploading in order to reduce disk usage to a minimum.
- Type:
Boolean
- Default:
true
keep_hours
- Keep hours- When pruning, keep files on the s3 that are younger than the configured amount of hours.
- Type:
Integer
- Default:
72
path
- Path- The path (directory) on the s3 within the bucket.
- Type:
Char
- Default:
null
prune
- Prune- Before uploading, remove existing files from the s3. Requires a
path
to be set for precaution. - Type:
Boolean
- Default:
false
- Before uploading, remove existing files from the s3. Requires a
# ScopeArchivSource
A section generic to fetch data from Scope Archiv using the Scope Archiv Connector.
The Scope Archiv Connector is a separate, optional service which reads preconfigured Oracle
views from a Scope Archiv database and provides them as a REST API to the ETL pipeline.
The ScopeArchivSource
section handles the communication with the service.
Use table_name
and columns
to specify which data to fetch. order_columns
defines the
columns used to order the data, they should be unique together.
Take care when configuring order_columns
: it should be one or more columns, which are indexed
and are unique together. It must be indexed in order to have good enough performance on oracle
and it must be unique together in order for the pagination to worker properly; otherwise data
will be missing. In this context, the order is solely the order the items are loaded and processed
in. Since items cannot have dependencies within the pipeline, this must be optimized for best
performance.
Use a single column when querying "primary" data views, containing records which each have an own unique identifier. Use multiple columns on the other hand when reading from junction tables which usually do not have one column that is unique.
Example:
{
"section": "ScopeArchivSource",
"options": {
"columns": [
"ID_NR",
"TITEL",
"PARENT_ID_NR",
"ZWEIG_POSITION",
"SIGNATUR",
"ANZEIGE_INTERNET"
],
"page_size": 100,
"filter_sql": ""HIERARCHIE_PFAD" NOT LIKE '00000796390000546102%'",
"table_name": "public.mvk_dls_vrzng_enht_1",
"order_columns": [
"ID_NR"
]
}
}
With the above example, the section acts as the first section in the pipeline. It can also be used for looking up specific items based on other items that are already in the pipeline, e.g. all files for specific records.
Example:
{
"section": "QuerysetSource",
"options": {
"model": "record",
"filter": {
"is_public": true,
"files_are_public": true
}
}
},
{
"section": "ScopeArchivSource",
"options": {
"columns": [
"PID",
"ID_NR_VRZNG_ENHT"
],
"page_size": 100,
"table_name": "public.mvk_dls_ve_vrkng_fdra_1",
"order_columns": [
"PID",
"ID_NR_VRZNG_ENHT"
],
"filter_by_previous_in": {
"column": "ID_NR_VRZNG_ENHT",
"previous_field": "identifier"
}
}
}
# Options
columns
- Columns required- A list of columns to be selected from the source.
- Type:
ListSerializer
order_columns
- Order columns required- A list of one or more columns for ordering, which are indexed and unique together.
- Type:
ListSerializer
table_name
- Table name required- The name of the table / oracle view to select from.
- Type:
Char
filter_by_previous_in
- Filter requested data by items in the pipeline- The
filter_by_previous_in
option allows to select rows that have a relation to items consumed by previous sections in the pipeline. When this field is configured, it must contain an object with two configurations, thecolumn
name, where the relation is stored in the source system and theprevious_field
, which is the name of the field where the foreign identifier is stored in the consumed items. See the example for reference. - Type:
Dict
column
- Filter field name required- Name of the column in the source system.
- Type:
Char
previous_field
- Previous field name required- Name of the field in the pipeline item.
- Type:
Char
- The
filter_sql
- SQL Filter Klausel- Additional WHERE clause in raw SQL for when we want to already filter on the source for improving performance. The
filter_sql
statement is joined with the generatedfilter_by_previous_in
clause withAND
. - Type:
Char
- Additional WHERE clause in raw SQL for when we want to already filter on the source for improving performance. The
page_size
- Page size- Amount of items that are loaded in one page. Usually the default is fine.
- Type:
Integer
- Min:
1
- Max:
1000
pagination
- Pagination- Depending on how the table looks, different pagination implementation are more efficient. Can be
cursor
oroffset
. By default, thecursor
pagination is used when we sort by only one column, theoffset
pagination for multiple columns. Usually it is best to not configure this option. - Type:
Choice
- Choices:
cursor
: Cursor-based paginationoffset
: Pagination with offset and limit
- Depending on how the table looks, different pagination implementation are more efficient. Can be
# SkipProcessedFiles
The SkipProcessedFiles
section has the job to detect as early as possible when we are processing
files that we have already processed and skip those files.
Importing files is very time consuming. That's why we want to skip files when we already have processed and imported them before. This makes it possible that pipelines can be aborted and restarted when needed and they can be configured in a way that they will skip items as long as they are already processed.
In order for being able to also make updates in the configuration (e.g. tile sizes of images) and
re-process the files, there is a pipeline_version
string configured, which is stored on each file.
If the pipeline version string from the pipeline configuration matches with the one stored in the file,
the file is skipped. When the configuration is changed and the goal is to rerun everything, the pipeline
version configuration in the section is increased and the pipeline rerun. With this mechanism, we can
successively update the images and always have data published.
Be aware that this is optimized for non-Fedora sources by default. In the Fedora case, you must set the
fedora
flag to true
so that only the first part of the identifier is compared, which is equal to the
PID in the fedora case. This is necessary because one package (PID) can result in multiple files.
Example:
{
"section": "SkipProcessedFiles",
"options": {
"identifier_field": "ID_NR_DATEI",
"pipeline_version": "1.0.0"
}
}
# Options
pipeline_version
- Pipeline version required- Pipeline version string. The skipping is based on this string. Can be changed when the pipeline is updated.
- Type:
Char
batch_size
- Batch size- Amount of items to be processed in one batch.
- Type:
Integer
- Default:
50
fedora
- Fedora mode- Set to
true
when the source is fedora, causing the identifiers to be compared by removing the path. - Type:
Boolean
- Default:
false
- Set to
identifier_field
- Identifier field name- Name of the field containing the unique identifier of the file.
- Type:
Char
- Default:
'ID_NR_DATEI'
# Slice
The slice section is used for only processing a certain slice of the items yielded by a previous section.
This is useful for testing pipeline configurations. It allows to limit
the amount of items yielded but also to skip the first n
items by setting
start
to n
.
Example:
{
"section": "Slice",
"options": {
"limit": 100,
"start": 0
}
}
# Options
limit
- Limit required- The maximum amount of items to be yielded.
- Type:
Integer
start
- Start- Skip the first
n
items and do not yield them. - Type:
Integer
- Default:
0
- Skip the first
# Solr
The Solr
section indexes records in the external, public Solr instance.
In the DLS, the Solr instance is installed in the external part and data indexed in Solr is available for the public in the search and used for displaying data in the web.
Solr is configured in a way that it only indexes records that have the
is_public
flag set to True
.
The Solr
section consumes items from the previous section, identifies them based
on the identifier_field
(which must match the value on the record in the identifier
field) and publishes them.
It is also possible to prune the Solr, removing all data from Solr. This has the impact that the records can no longer be found in the web until indexing is fully completed.
Example:
{
"section": "Solr",
"options": {
"prune": false,
"batch_size": 500,
"identifier_field": "identifier",
"lookup_field": "identifier"
}
}
# Options
batch_size
- Batch size- Amount of items to be processed in one batch.
- Type:
Integer
- Default:
200
identifier_field
- Identifier field name- Name of the field in the consumed item that contains the unique identifier.
- Type:
Char
- Default:
'identifier'
lookup_field
- Lookup field name- Name of the field on the record used to look up the values in the identifier_field.
- Type:
Char
- Default:
'identifier'
prune
- Prune- Remove all data from Solr before starting to index.
- Type:
Boolean
- Default:
false
# Tree
The Tree
section builds a tree based on the record, so that the records
can be displayed as tree in the web.
Create the tree (Node instances) based on records in the database. Starting with the root node, all immediate children nodes of this record are created. If the children have further children, they are recursively created as well.
Yield the data used to create the tree.
Example:
{
"section": "Tree",
"options": {
"root_value": "-1"
}
}
# Options
root_value
- Root value- Identifier of the root record. There must be exactly one root record.
- Type:
Char
- Default:
'-1'
# TreeSorter
Sort items for building a tree.
The sorter makes sure that parents always are before their children in the pipeline.
The sorter also handles orphan nodes according to the configuration.
Options:
- id_field: name of the ID field of a node.
- parent_id_field: name of the parent ID field of a node.
- root_value: parent_id_field value of the root node.
- orphans: Handling of orphan nodes.
Orphan node options:
- "drop": drop the records (default)
- "raise": abort the pipeline run with an exception
# Options
id_field
- ID field name- Type:
Char
- Default:
'ID_NR'
- Type:
orphans
- Orphan strategy- Type:
Choice
- Default:
'raise'
- Choices:
drop
: Drop the records (default)raise
: Abort the pipeline run with an exception
- Type:
parent_id_field
- Parent-ID field name- Type:
Char
- Default:
'PARENT_ID_NR'
- Type:
root_value
- Root value id- Type:
Char
- Default:
'-1'
- Type:
# XMLReader
The XMLReader
section allows to read XML files and load containing
items into the pipeline.
This section expects the previous section to yield a dictionary with a path
key,
such as it is done by the HTTP
section.
The path
key should point to a XML file.
An example of a valid item is:
[
{
"path": "/path/to/file.xml"
}
]
Lets say we have an XML file with the following structure:
<SearchDetailResponse xmlns="http://www.cmiag.ch/cdws/searchDetailResponse">
<Verzeichnungseinheit>
<ID>164494</ID>
<Titel>Abteilung für die XYZ</Titel>
<Verzeichnungsstufe>Serie</Verzeichnungsstufe>
<Findmittel/>
</Verzeichnungseinheit>
</SearchDetailResponse>
We can use the items
option to specify the path to the items in the XML file.
The path should be a valid JSON pointer. For getting more information about the JSON Pointer syntax,
please refer to the JSON Pointer documentation (opens new window).
For example, if we want to yield the Verzeichnungseinheit
items, we can use the following
configuration:
{
"section": "XMLReader",
"options": {
"items": "/SearchDetailResponse/Verzeichnungseinheit"
}
}
This will yield the following items:
[
{
"ID": "164494",
"Titel": "Abteilung für die XYZ",
"Verzeichnungsstufe": "Serie",
"Findmittel": null,
}
]
# Making the child items a list
Lets say we have an XML file with the following structure:
<SearchDetailResponse xmlns="http://www.cmiag.ch/cdws/searchDetailResponse">
<Verzeichnungseinheit>
<ID>164494</ID>
<Titel>Abteilung für die XYZ</Titel>
<Verzeichnungsstufe>Serie</Verzeichnungsstufe>
<Bilder>
<Bild src="imageserver.ch/image123456.jpg">Bild 1</Bild>
</Bilder>
<Findmittel/>
</Verzeichnungseinheit>
</SearchDetailResponse>
By default, the Bilder
item will be a dictionary because there is only one Bilder
item.
Using with the following default configuration:
{
"section": "XMLReader",
}
This will yield the following item:
[
{
"SearchDetailResponse": {
"@xmlns": "http://www.cmiag.ch/cdws/Verzeichnungseinheit",
"Verzeichnungseinheit": {
"ID": "164494",
"Titel": "Abteilung für die XYZ",
"Verzeichnungsstufe": "Serie",
"Bilder": {
"Bild": {
"@src": "imageserver.ch/image123456.jpg",
"#text": "Bild 1"
}
},
"Findmittel": null,
},
}
}
]
If we want to make the Bilder
items as a list, we can use the ensure_list
option.
option:
{
"section": "XMLReader",
"options": {
"ensure_list": ["Bilder"]
}
}
This will yield the following item:
[
{
"SearchDetailResponse": {
"@xmlns": "http://www.cmiag.ch/cdws/Verzeichnungseinheit",
"Verzeichnungseinheit": {
"ID": "164494",
"Titel": "Abteilung für die XYZ",
"Verzeichnungsstufe": "Serie",
"Bilder": [
{
"Bild": {
"@src": "imageserver.ch/image123456.jpg",
"#text": "Bild 1"
}
}
],
"Findmittel": null,
},
}
}
]
The ensure_list
option can be used to ensure that the items are a list.
You can only specify the name of the tag that should be a list.
All the tags with the same name will be converted to a list.
# Using the path
option
The path
option can be used to specify the path to the XML file.
The path determined by looking up "$" prefixed names in previous section.
For example, if the previous section yields the following item:
[
{
"different_path_key": "/path/to/file.xml"
}
]
We can use the following configuration:
{
"section": "XMLReader",
"options": {
"path": "$different_path_key"
}
}
The path
can also be file path. Regardless of the previous section, the file will be read from the specified path.
For example, we can use the following configuration:
{
"section": "XMLReader",
"options": {
"path": "/path/to/file.xml"
}
}
# Keep the previous item data
By default, the previous item data is discarded and the item is replaced with the parsed result.
If you want to keep the previous item data, you can use the target_key
option.
For example, we can use the following configuration:
{
"section": "XMLReader",
"options": {
"target_key": "xml_data"
}
}
This will yield the following item:
[
{
"path": "/path/to/file.xml",
"xml_data": [
{
"ID": "164494",
"Titel": "Abteilung für die XYZ",
"Verzeichnungsstufe": "Serie",
"Findmittel": null,
}
]
}
]
# Options
ensure_list
- Ensure list- The tag names, that should contain lists.
- Type:
List
items
- Items- JSON Pointer to XML node containing the items to be yielded.
- Type:
Char
- Default:
''
path
- Path- Expression for getting the path to read.
- Type:
Char
- Default:
'$path'
target_key
- Target key- Key in the item, where to store the results. When
null
, the item in the pipeline is replaced with the parsed result. - Type:
Char
- Default:
null
- Key in the item, where to store the results. When