Skip to content

Latest commit

 

History

History
323 lines (305 loc) · 9.28 KB

painless-watcher-transform-context.asciidoc

File metadata and controls

323 lines (305 loc) · 9.28 KB

Watcher transform context

Use a Painless script as a {ref}/transform-script.html[watch transform] to transform a payload into a new payload for further use in the watch. Transform scripts return an Object value of the new payload.

Return

Object

The new payload.

API

The standard Painless API is available.

Example

To run the examples, first follow the steps in context examples.

POST _watcher/watch/_execute
{
  "watch" : {
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : { "term": { "sold": "true"} },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "transform" : {
      "script":
      """
        return [
          'money_makers': ctx.payload.aggregations.theatres.buckets.stream()  (1)
            .filter(t -> {                                                    (2)
                return t.money.value > 50000
            })
            .map(t -> {                                                       (3)
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList()),                                  (4)
          'duds' : ctx.payload.aggregations.theatres.buckets.stream()         (5)
            .filter(t -> {
                return t.money.value < 15000
            })
            .map(t -> {
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList())
          ]
      """
    },
    "actions" : {
      "my_log" : {
        "logging" : {
          "text" : "The output of the payload was transformed to {{ctx.payload}}"
        }
      }
    }
  }
}
  1. The Java Stream API is used in the transform. This API allows manipulation of the elements of the list in a pipeline.

  2. The stream filter removes items that do not meet the filter criteria.

  3. The stream map transforms each element into a new object.

  4. The collector reduces the stream to a java.util.List.

  5. This is done again for the second set of values in the transform.

The following action transform changes each value in the mod_log action into a String. This transform does not change the values in the unmod_log action.

POST _watcher/watch/_execute
{
  "watch" : {
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "actions" : {
      "mod_log" : {
        "transform": {                                                                (1)
          "script" :
          """
          def formatter = NumberFormat.getCurrencyInstance();
          return [
            'msg': ctx.payload.aggregations.theatres.buckets.stream()
              .map(t-> formatter.format(t.money.value) + ' for the play ' + t.key)
              .collect(Collectors.joining(", "))
          ]
          """
        },
        "logging" : {
          "text" : "The output of the payload was transformed to: {{ctx.payload.msg}}"
        }
      },
      "unmod_log" : {                                                                 (2)
        "logging" : {
          "text" : "The output of the payload was not transformed and this value should not exist: {{ctx.payload.msg}}"
        }
      }
    }
  }
}

This example uses the streaming API in a very similar manner. The differences below are subtle and worth calling out.

  1. The location of the transform is no longer at the top level, but is within an individual action.

  2. A second action that does not transform the payload is given for reference.

The following example shows scripted watch and action transforms within the context of a complete watch. This watch also uses a scripted condition.

POST _watcher/watch/_execute
{
  "watch" : {
    "metadata" : { "high_threshold": 4000, "low_threshold": 1000 },
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": {
                      "field" : "cost",
                      "script": {
                       "source": "doc.cost.value * doc.number.value"
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    },
    "condition" : {
      "script" :
      """
        return ctx.payload.aggregations.theatres.buckets.stream()
          .anyMatch(theatre -> theatre.money.value < ctx.metadata.low_threshold ||
                               theatre.money.value > ctx.metadata.high_threshold)
      """
    },
    "transform" : {
      "script":
      """
        return [
          'money_makers': ctx.payload.aggregations.theatres.buckets.stream()
            .filter(t -> {
                return t.money.value > ctx.metadata.high_threshold
            })
            .map(t -> {
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList()),
          'duds' : ctx.payload.aggregations.theatres.buckets.stream()
            .filter(t -> {
                return t.money.value < ctx.metadata.low_threshold
            })
            .map(t -> {
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList())
          ]
      """
    },
    "actions" : {
      "log_money_makers" : {
        "condition": {
          "script" : "return ctx.payload.money_makers.size() > 0"
        },
        "transform": {
          "script" :
          """
          def formatter = NumberFormat.getCurrencyInstance();
          return [
            'plays_value': ctx.payload.money_makers.stream()
              .map(t-> formatter.format(t.total_value) + ' for the play ' + t.play)
              .collect(Collectors.joining(", "))
          ]
          """
        },
        "logging" : {
          "text" : "The following plays contain the highest grossing total income: {{ctx.payload.plays_value}}"
        }
      },
      "log_duds" : {
        "condition": {
          "script" : "return ctx.payload.duds.size() > 0"
        },
        "transform": {
          "script" :
          """
          def formatter = NumberFormat.getCurrencyInstance();
          return [
            'plays_value': ctx.payload.duds.stream()
              .map(t-> formatter.format(t.total_value) + ' for the play ' + t.play)
              .collect(Collectors.joining(", "))
          ]
          """
        },
        "logging" : {
          "text" : "The following plays need more advertising due to their low total income: {{ctx.payload.plays_value}}"
        }
      }
    }
  }
}

The following example shows the use of metadata and transforming dates into a readable format.

POST _watcher/watch/_execute
{
  "watch" : {
    "metadata" : { "min_hits": 10 },
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "condition" : {
      "script" :
      """
        return ctx.payload.hits.total > ctx.metadata.min_hits
      """
    },
    "transform" : {
      "script" :
      """
        def theDate = ZonedDateTime.ofInstant(ctx.execution_time.toInstant(), ctx.execution_time.getZone());
        return ['human_date': DateTimeFormatter.RFC_1123_DATE_TIME.format(theDate),
                'aggregations': ctx.payload.aggregations]
      """
    },
    "actions" : {
      "my_log" : {
        "logging" : {
          "text" : "The watch was successfully executed on {{ctx.payload.human_date}} and contained {{ctx.payload.aggregations.theatres.buckets.size}} buckets"
        }
      }
    }
  }
}