Logpickr Sink
Overview
A Sink connector transfers data from Kafka to another system (in our case Logpickr public API).
The connector retrieves AVRO formatted data from a Kafka topic and forwards them to Logpickr API.
The data type is known via Kafka Registry and is mapped to Logpickr project Column Mapping. Data is aggregated to the project and displayed by Process Explorer 360.
The connector can also create a new Column Mapping for the projet, if none exists on a new project for example.
The connector also aggregates data according to four thresholds :
- Element number : when the number of aggregated elements reach a certain value the aggregation result is sent to the Logpickr API
- Value pattern : A Regex pattern triggering the flush of the current aggregation to the Logpickr API if the value of the incoming sink record matches it
- Timeout : After a certain time since the last sending of an aggregation result, the current aggregated data are sent (even if the number of elements threshold is not reached)
- Retention : This threshold is not defined by the user in the connector's properties. It is linked to the value of the retention.ms configuration in the Kafka topic from which a data comes. See the section below related to the retention for more information
Properties
Connector instantiation example :
connector.class = "com.logpickr.kafka.sink.main.domain.LogpickrAggregationSinkConnector",
tasks.max = "1",
topics = "logpickr_topic_example",
api.url = "api_url_example",
api.authUrl = "auth_url_example",
workGroupId = "w_id_example",
workGroupKey = "w_key_example",
projectId = "project_id_example",
csv.encoding = "UTF-8",
csv.separator = ",",
csv.quote = """,
csv.fieldsNumber = "9",
csv.header = "true",
csv.defaultTextValue = "null",
retentionTimeInDay = "100",
columnMapping.create = "true",
columnMapping.caseIdColumnIndex = "0",
columnMapping.activityColumnIndex = "1",
columnMapping.timeInformationList = "{2;dd/MM/yy HH:mm},{3;dd/MM/yy HH:mm}",
columnMapping.dimensionsInformationList = "[{"columnIndex": 4, "name": "Country", "isCaseScope": true, "aggregation": "FIRST", "groupedTasksAggregation": "FIRST"},{"columnIndex": 5, "name": "Region", "isCaseScope": false, "groupedTasksAggregation": "FIRST"},{"columnIndex": 6, "name": "City", "isCaseScope": false, "groupedTasksAggregation": "LAST"}]",
columnMapping.metricsInformationList = "[{"columnIndex": 7, "name": "Price", "unit": "Euros", "isCaseScope": true, "aggregation": "MIN", "groupedTasksAggregation": "AVG"},{"columnIndex": 8, "name": "DepartmentNumber", "isCaseScope": false, "groupedTasksAggregation": "FIRST"}]",
columnMapping.groupedTasksColumns = "[1, 2, 3]",
csv.endOfLine = "\\n",
csv.escape = "\",
csv.comment = "#",
kafkaLoggingEvents.isLogging = "true",
kafkaLoggingEvents.topic = "event_logging_topic_example",
threshold.elementNumber = "6",
threshold.valuePattern = ".*regex_example.*",
threshold.timeoutInSeconds = "3000",
bootstrap.servers = "broker:29092",
key.converter = "org.apache.kafka.connect.storage.StringConverter",
value.converter = "io.confluent.connect.avro.AvroConverter",
value.converter.schema.registry.url = "http://schema-registry:8081"
Warning : backslash character must be escaped
Mandatory Properties :
The following properties must not change :
- connector.class (String)
- key.converter (String)
- value.converter (String)
Other ones can be modified :
- tasks.max (Int) : Max task number for the connector
- api.url (String) : Logpickr API URL (public API)
- api.authUrl (String) : Authentication URL to retrieve connection token
- workGroupId (String) : workgroup ID
- workGroupKey (String) : workgroup Key
- projectId (String) : Logpickr project ID (UUID)
- csv.encoding (String) : File encoding (UTF-8/ASCII/ISO-8859-1)
- csv.separator (String) : CSV file separator (1 character only)
- csv.quote (String) : CSV quote character (1 character only )
- csv.fieldsNumber (Int) : Number of fields for each row ( >= 3)
- csv.header (Boolean) : CSV file has header or not (true/false)
- csv.defaultTextValue (String) : Substitution value in case of missing value in a column
- retentionTimeInDay (Int) : File archive duration in days ( > 0)
- threshold.elementNumber (Int) : The maximum number of elements in one aggregation (when the number is reached, the current aggregation is sent to the Logpickr API)
- threshold.valuePattern (String) : Facultative property sending to the Logpickr API the current aggregation if the value of an incoming sink record matches the regex defined in threshold.valuePattern (the aggregation is sent even if the number of elements corresponding to threshold.elementNumber is not reached). This threshold is not relevant if the property is not defined while creating the connector. The pattern is on the entire string value of a SinkRecord (if the value has the JSON format for example, the pattern will have to take into account the '{' '}' of the JSON, it needs to be adapted to the structure of the data)
- threshold.timeoutInSeconds (Int) : If the time since the last sending of an aggregation result exceeds the value in seconds of threshold.timeoutInSeconds, the current aggregation (which was in construction) is sent to the Logpickr API (even if the number of elements corresponding to threshold.elementNumber is not reached)
- bootstrap.servers (String) : The List of Kafka brokers
- value.converter.schema.registry.url (String) : URL of the Confluent Schema Registry
For more information about regex (used with the threshold.valuePattern property) : https://medium.com/factory-mind/regex-tutorial-a-simple-cheatsheet-by-examples-649dc1c3f285
Optional :
Following properties used to define Column Mapping :
- columnMapping.create (Boolean) : Create Column Mapping (true/false) if true then all the following properties need to be filled, if false it is not necessary to write the following properties
- columnMapping.caseIdColumnIndex (Int) : index of CaseId ( >= 0)
- columnMapping.activityColumnIndex (Int) : index Activity ( >= 0)
- columnMapping.timeInformationList (String) : Information about the Time columns under the format {columnIndex;dateFormat} separated by a ',' if there are two columns. The information for at least one column and for a maximum of two columns must be provided. The columnIndex needs to be an Int >= 0 and dateFormat needs to be a String of minimum length 1
- columnMapping.dimensionsInformationList (String) : Information about the Dimension columns. The information needs to be given by the user with a JSON format. The columnIndex needs to be an Int >= 0 and columnName needs to be a String of minimum length 1. IsCaseScope is a mandatory boolean used to determine if, for an event, the value of the column is calculated according to an aggregation on the entire case. For the Dimension, the following aggregation types are valid : "FIRST", "LAST", "DISTINCT", the type of the chosen aggregation is defined with the aggregation parameter. Please note that if isCaseScope is true you need to give a valid aggregation type, and if it is false you have the choice between writing or not the aggregation parameter. When the property columnMapping.groupedTasksColumns is defined, each dimension needs to set the groupedTasksAggregation parameter (this parameter should not be set if the columnMapping.groupedTasksColumns property is not defined), the following grouped tasks aggregation types are valid : "FIRST", "LAST". For more information about the grouped tasks, click here.
- columnMapping.metricsInformationList (String) : Information about the Metric columns. The information needs to be given by the user with a JSON format. The columnIndex needs to be an Int >= 0, columnName needs to be a String of minimum length 1. IsCaseScope is a mandatory boolean used to determine if, for an event, the value of the column is calculated according to an aggregation on the entire case. For the Metric, the following aggregation types are valid : "FIRST", "LAST", "MIN", "MAX", "SUM", "AVG", "MEDIAN", the type of the chosen aggregation is defined with the aggregation parameter. Please note that if isCaseScope is true you need to give a valid aggregation type, and if it is false you have the choice between writing or not the aggregation parameter. When the property columnMapping.groupedTasksColumns is defined, each metric needs to set the groupedTasksAggregation parameter (this parameter should not be set if the columnMapping.groupedTasksColumns property is not defined), the following grouped tasks aggregation types are valid : "FIRST", "LAST", "MIN", "MAX", "SUM", "AVG", "MEDIAN". For more information about the grouped tasks, click here. Finally, the unit parameter needs to be a String and is facultative (It is possible to only fill the columnIndex, the columnName, and the isCaseScope).
- columnMapping.groupedTasksColumns (String) : Information about the columns to use for grouping events. The information needs to be given by the user as a List in a JSON format. If this property is not defined, similar events are not grouped, if it is defined, it should at least contain the index of one time/dimension/metric column. When this property is defined, all the dimensions (columnMapping.dimensionsInformationList property) and metrics (columnMapping.metricsInformationList) should define a groupedTasksAggregation. For more information regarding the grouped tasks, click here.
- csv.endOfLine (String) : End of line character (size >= 1)
- csv.escape (String) : Escape character (1 character)
- csv.comment (String) : Comment character (1 character)
For Column Mapping, each column index must be different, and total number of columns must be equal to mandatory property csv.fieldsNumber.
Note that in the case where csv.header is true, and if the connector is supposed to create a Column Mapping in the Logpickr project, then the headers created for the generated files will use the information of the Column Mapping for their columns names. Otherwise, if csv.header is true, but the connector isn't supposed to create a Column Mapping, the headers will simply correspond to a succession of csv.fieldsNumber - 1 separators characters, defined by csv.separator
Finally, the following properties have to be defined only if you need the connector to register its file related events in a Kafka topic (see the below section about Logging Events in Kafka) :
- kafkaLoggingEvents.isLogging (Boolean) : Indicates whether the connector registers its file related events in a Kafka topic or not (true/false), if true events will be logged in a Kafka topic, if false (default value) they won't
- kafkaLoggingEvents.topic (String) : The name of the Kafka topic registering the events (length >= 1)
AVRO Format
AVRO format is used by the connector.
The expected format from the topic is the following (define in schema registry) :
ARRAY<STRUCT<columnID INT, text VARCHAR, quote BOOLEAN>>
The Array corresponds to one event (and one event corresponds to one line once in the CSV file), with each STRUCT in the Array corresponding to a column of the event (a field once in the CSV file, such as the caseId or the activity for instance)
So one record coming from Kafka corresponds to one event and the connector aggregates together multiple events. When a threshold is reached, the aggregated events are written in the same file, which is then sent to the Logpickr API
To write correctly a field in the CSV file we need :
- Its column number (columnId),
- Its value (text)
- To know if the field is a quote or not (quote)
For instance, the following data coming from a Kafka topic (represented here with a JSON format, but is in reality under the AVRO format) :
"{"DATAARRAY":
[{"QUOTE":true, "TEXT":"activity1", "COLUMNID":1},
{"QUOTE":false, "TEXT":"caseId1", "COLUMNID":0},
{"QUOTE":false, "TEXT":"endDate1", "COLUMNID":3}]
}"
will be written as the following line in the CSV file :
caseId1,"activity1",null,endDate1
If in the connector's properties :
- csv.separator = ,
- csv.quote = "
- csv.defaultTextValue = null
- csv.fieldsNumber = 4
Warning : It is important to note that the names DATAARRAY, QUOTE, TEXT and COLUMNID need to be respected in ksqlDB in order to correctly read the AVRO data coming from a Kafka topic
Any null value for either one event, one column of an event, or for one parameter in one column of an event, is considered as an error and will stop the Task
Grouped Tasks Columns
While defining the Column Mapping (when columnMapping.create equals true), we can also optionally define, with the columnMapping.groupedTasksColumns property, a set of columns to use for grouping events that have the same value (in the defined columns) together. The events are grouped within their case, and grouped tasks aggregations need to be defined for the dimensions and metrics.
The idea of this functionality is to regroup multiple similar events into only one event.
Grouped Tasks Example
Let's take the following events as an example :
CaseId | Activity | StartDate | EndDate | Country | City | Price |
---|---|---|---|---|---|---|
1 | A | 10/10/10 08:38 | 11/10/10 08:38 | France | Paris | 10 |
1 | B | 10/10/10 09:40 | 11/10/10 09:40 | Germany | Berlin | 20 |
1 | A | 10/10/10 10:42 | 11/10/10 10:42 | France | Toulouse | 30 |
1 | C | 10/10/10 11:50 | 11/10/10 11:50 | Germany | Munich | 10 |
1 | C | 10/10/10 12:50 | 11/10/10 12:50 | Germany | Hamburg | 20 |
2 | A | 10/10/10 08:20 | 11/10/10 08:20 | France | Rennes | 5 |
2 | B | 10/10/10 09:30 | 11/10/10 09:30 | Germany | Berlin | 10 |
2 | A | 10/10/10 10:40 | 11/10/10 10:40 | France | Bordeaux | 25 |
2 | A | 10/10/10 11:50 | 11/10/10 11:50 | USA | New York | 10 |
And let's say that the column mapping properties in the connector look like this :
columnMapping.create = "true",
columnMapping.caseIdColumnIndex = "0",
columnMapping.activityColumnIndex = "1",
columnMapping.timeInformationList = "{2;dd/MM/yy HH:mm},{3;dd/MM/yy HH:mm}",
columnMapping.dimensionsInformationList = "[{"columnIndex": 4, "name": "Country", "isCaseScope": false, "groupedTasksAggregation": "FIRST"},{"columnIndex": 5, "name": "City", "isCaseScope": false, "groupedTasksAggregation": "FIRST"}]",
columnMapping.metricsInformationList = "[{"columnIndex": 6, "name": "Price", "unit": "Euros", "isCaseScope": true, "aggregation": "MIN", "groupedTasksAggregation": "AVG"}]",
columnMapping.groupedTasksColumns = "[1, 4]"
Here the columns to use for grouping are the ones corresponding to the indexes 1 and 4 which are respectively the columns Activity and Country. They are defined through the columnMapping.groupedTasksColumns property
When columnMapping.groupedTasksColumns is defined, we also need to define the groupedTasksAggregation argument for each dimension/metric. With this example, here are the grouped tasks aggregations defined for the dimension and metric columns: * FIRST for the Country dimension column * FIRST for the City dimension column * AVG for the Price metric column
For the dimension columns the valid grouped tasks aggregation values are FIRST/LAST
For the metric columns the valid grouped tasks aggregation values are FIRST/LAST/MIN/MAX/SUM/AVG/MEDIAN
Consequently, within a case, all the events that have the same values for the Activity and Country columns will be grouped together, and the new values for the dimension and metric columns are computed according to their related groupedTasksAggregation
If the timestamp columns are not defined in the columns to use for grouping (here columns 2 and 3 are not defined in the columnMapping.groupedTasksColumns property), we don't have to define an aggregation as for the dimension or metrics: * The lowest timestamp of all the events of a group will be used as the new start timestamp of the new single event. * The highest timestamp of all the events of a group will be used as the new end timestamp of the new single event.
After the creation of the connector, a Mining project that has the column mapping defined above will receive those events and will regroup some of them in the following way:
For CaseId 1: * The first and third events of this case have the same values for their Activity (A) and Country (France) columns. Consequently, they are grouped together to only make one event of activity A and of country France. * The second event is not grouped, as no other event in this case has an Activity named B and a Country named Germany. * The fourth and fifth events of this case have the same values for their Activity (C) and Country (Germany) columns. Consequently, they are grouped together to only make one event of activity C and of country Germany.
For CaseId 2: * The first and third events of this case have the same values for their Activity (A) and Country (France) columns. Consequently, they are grouped together to only make one event of activity A and of country France. * The second event is not grouped, as no other event in this case has an Activity named B and a Country named Germany. * The fourth event is not grouped, it has the same Activity (A) as the first and third events of this case, but its Country (USA) is different.
After grouping the similar events together, it gives us this list of events:
CaseId | Activity | StartDate | EndDate | Country | City | Price |
---|---|---|---|---|---|---|
1 | A | 10/10/10 08:38 | 11/10/10 10:42 | France | Paris | 20 |
1 | B | 10/10/10 09:40 | 11/10/10 09:40 | Germany | Berlin | 20 |
1 | C | 10/10/10 11:50 | 11/10/10 12:50 | Germany | Munich | 15 |
2 | A | 10/10/10 08:20 | 11/10/10 10:40 | France | Rennes | 15 |
2 | B | 10/10/10 09:30 | 11/10/10 09:30 | Germany | Berlin | 10 |
2 | A | 10/10/10 11:50 | 11/10/10 11:50 | USA | New York | 10 |
For CaseId 1: * The first event of this case in the new list of events was created by grouping the first and third events of this case in the initial list of events (before grouping). * CaseId was 1 for the two events that were grouped, so it stays at 1 for the new single event. * Activity was A for the two events that were grouped, so it stays at A for the new single event. * StartDate was 10/10/10 08:38 for the first event that was grouped, and 10/10/10 10:42 for the second one. The lowest timestamp (10/10/10 08:38) is used as the start timestamp of the new single event. * EndDate was 11/10/10 08:38 for the first event that was grouped, and 11/10/10 10:42 for the second one. The highest timestamp (11/10/10 10:42) is used as the end timestamp of the new single event. * Country was France for the two events that were grouped, so it stays at France for the new single event. * City was Paris for the first event that was grouped, and Toulouse for the second one. In the column mapping, FIRST was defined as the groupedTasksAggregation for this dimension, consequently, as Paris is the first value to come, it is the one used for the new single event. * Price was 10 for the first event that was grouped, and 30 for the second one. In the column mapping, AVG was defined as the groupedTasksAggregation for this metric, consequently, 20 is the value of this metric for the new single event (20 being the result of the average of 10 and 30). * The second event of this case in the new list of events is identical to the second event of this case in the initial list of events (before grouping), as we couldn't group it with other events. * The third event of this case in the new list of events was created by grouping the fourth and fifth events of this case in the initial list of events (before grouping). * CaseId was 1 for the two events that were grouped, so it stays at 1 for the new single event. * Activity was C for the two events that were grouped, so it stays at C for the new single event. * StartDate was 10/10/10 11:50 for the first event that was grouped, and 10/10/10 12:50 for the second one. The lowest timestamp (10/10/10 11:50) is used as the start timestamp of the new single event. * EndDate was 11/10/10 11:50 for the first event that was grouped, and 11/10/10 12:50 for the second one. The highest timestamp (11/10/10 12:50) is used as the end timestamp of the new single event. * Country was Germany for the two events that were grouped, so it stays at Germany for the new single event. * City was Munich for the first event that was grouped, and Hamburg for the second one. In the column mapping, FIRST was defined as the groupedTasksAggregation for this dimension, consequently, as Munich is the first value to come, it is the one used for the new single event. * Price was 10 for the first event that was grouped, and 20 for the second one. In the column mapping, AVG was defined as the groupedTasksAggregation for this metric, consequently, 15 is the value of this metric for the new single event (15 being the result of the average of 10 and 20).
For CaseId 2: * The first event of this case in the new list of events was created by grouping the first and third events of this case in the initial list of events (before grouping). * CaseId was 2 for the two events that were grouped, so it stays at 2 for the new single event. * Activity was A for the two events that were grouped, so it stays at A for the new single event. * StartDate was 10/10/10 08:20 for the first event that was grouped, and 10/10/10 10:40 for the second one. The lowest timestamp (10/10/10 08:20) is used as the start timestamp of the new single event. * EndDate was 11/10/10 08:20 for the first event that was grouped, and 11/10/10 10:40 for the second one. The highest timestamp (11/10/10 10:40) is used as the end timestamp of the new single event. * Country was France for the two events that were grouped, so it stays at France for the new single event. * City was Rennes for the first event that was grouped, and Bordeaux for the second one. In the column mapping, FIRST was defined as the groupedTasksAggregation for this dimension, consequently, as Rennes is the first value to come, it is the one used for the new single event. * Price was 5 for the first event that was grouped, and 25 for the second one. In the column mapping, AVG was defined as the groupedTasksAggregation for this metric, consequently, 15 is the value of this metric for the new single event (15 being the result of the average of 5 and 25). * The second event of this case in the new list of events is identical to the second event of this case in the initial list of events (before grouping), as we couldn't group it with other events. * The third event of this case in the new list of events is identical to the fourth event of this case in the initial list of events (before grouping), as we couldn't group it with other events.
This new list of events is then used as the data in the Mining project.
As a side note, if for the same initial list of events we don't want to group any events together, the column mapping should be:
columnMapping.create = "true",
columnMapping.caseIdColumnIndex = "0",
columnMapping.activityColumnIndex = "1",
columnMapping.timeInformationList = "{2;dd/MM/yy HH:mm},{3;dd/MM/yy HH:mm}",
columnMapping.dimensionsInformationList = "[{"columnIndex": 4, "name": "Country", "isCaseScope": false},{"columnIndex": 5, "name": "City", "isCaseScope": false}]",
columnMapping.metricsInformationList = "[{"columnIndex": 6, "name": "Price", "unit": "Euros", "isCaseScope": true, "aggregation": "MIN"}]"
Remarks regarding the Column Mapping for grouped tasks
- An error will appear at the creation of the connector if the columnMapping.groupedTasksColumns property is defined but doesn't contain at least one column index of a time or dimension or metric column.
- An error will appear at the creation of the connector if the columnMapping.groupedTasksColumns property is defined but not the groupedTasksAggregation argument of all the dimensions and/or metrics.
-
An error will appear at the creation of the connector if the columnMapping.groupedTasksColumns property is not defined but at least one dimension/metric defined its groupedTasksAggregation argument.
-
If the columnMapping.groupedTasksColumns property is defined without the column index of the activity column, the connector will automatically add it to the set of grouped tasks columns indexes that is sent to the Mining.
- If the columnMapping.groupedTasksColumns property is defined with the column index of the caseId column, the connector will automatically remove it from the set of grouped tasks columns indexes that is sent to the Mining.