Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding aggregations that keep the original adding order in source code #932

Closed
giangnvt opened this issue Feb 1, 2025 · 2 comments
Closed

Comments

@giangnvt
Copy link

giangnvt commented Feb 1, 2025

Description

Java API client version
8.10.4

Java version
Java 17

Elasticsearch Version
8.11.4

Problem description
I have an Elasticsearch query like below, where I try to take the aggregations, filter with bucket_filter then do pagination with bucket_paging. If I execute this exact query, I get the correct output as expected. But if I switch the order of bucket_filter and bucket_paging in query, it returns less documents than expected. As guess that, with the later case, Elasticsearch executes the bucket_paging paging first (that return max 50 items), then applies the bucket_filter filter, that in turn filters out a few more items from previous 50 items.
I have also contacted with ElasticSearch support team and they confirmed that the order of pipeline aggregations (like bucket_selector and bucket_sort) does matter the query result.

My problem is, I'm using elasticsearch-java client library to build the query, which put aggregations into a map instead of a list, as a result the order of aggregations are random in the final built query.
Is there's any workaround so that I can fix this?

Source code (Kotlin):

val query = NativeQueryBuilder()
    .withQuery({
        MatchAllQuery.of { it }
    } ()._toQuery())
query.withSearchType(Query.SearchType.QUERY_THEN_FETCH)
      .withAggregation("by_planning_sum_id", Aggregation.of {
          it.terms { it.field("root_planning_sum_id")
                          .also{ aggregate -> "${maxBucketsSize}".let{ aggregate.size(it.toInt()) }}}
          .aggregations("country_data", Aggregation.of {
              it.filter( {
                  val subQuery = QueryBuilders.bool()
                      .apply {
                          if ("${sortName}".isNotEmpty()) {
                              must(TermQuery.of { it.field("${sortCode}").value("${sortName}") }._toQuery()
                              )
                          }
                      }
                  if (subQuery.hasClauses()) subQuery.build() else MatchAllQuery.of { it }
              } ()._toQuery())
              .aggregations("avg_score", Aggregation.of {
                  it.avg { it.field("review_score") }
              })})
          .aggregations(
              "zero_flag", Aggregation.of { it.bucketScript {
                  it.bucketsPath { it.dict(mapOf("count" to "country_data>_count")) }
                      .script {it.inline {it.source("return ((params.count == 0) ? 0 : 1)")}}
                      .gapPolicy(GapPolicy.InsertZeros) } })
          .aggregations("avg_score", Aggregation.of {
              it.avg { it.field("review_score") }
          })
          .aggregations("bad_count", Aggregation.of {
              it.filter( {
                  val subQuery = QueryBuilders.bool()
                      .must(TermQuery.of { it.field("review_score_class").value("bad") }._toQuery()
                      )
                  if (subQuery.hasClauses()) subQuery.build() else MatchAllQuery.of { it }
              } ()._toQuery())
          })
          .aggregations("quality_negative_count", Aggregation.of {
              it.filter( {
                  val subQuery = QueryBuilders.bool()
                      .must(TermQuery.of { it.field("quality_label_class").value("negative") }._toQuery()
                      )
                  if (subQuery.hasClauses()) subQuery.build() else MatchAllQuery.of { it }
              } ()._toQuery())
          })
          .aggregations("bad_ratio", Aggregation.of { it.bucketScript {
              it.bucketsPath { it.dict(mapOf("all" to "_count","bad" to "bad_count>_count"
              )) }.script { it.inline { it.source("params.bad/params.all") } } } })
          .aggregations(
              "bucket_filter", Aggregation.of { it.bucketSelector { it.bucketsPath { it.dict(mapOf(
                 "count" to "_count")) }
                    .script { it.inline { it.source("params.count>=${lowestCount}") } } } })
          .aggregations(
              "bucket_paging", Aggregation.of { it.bucketSort { it.sort(listOf(
                  SortOptions.of { it.field { it.field("zero_flag").order(SortOrder.Desc) } },
                  SortOptions.of { it.field { it.field("country_data>${sortKey}").order(if ("${sortValue}" == "asc") SortOrder.Asc else SortOrder.Desc) } },
                  SortOptions.of { it.field { it.field("${sortKey}").order(if ("${sortValue}" == "asc") SortOrder.Asc else SortOrder.Desc) } },
                  SortOptions.of { it.field { it.field("${sortKey2}").order(if ("${sortValue2}" == "asc") SortOrder.Asc else SortOrder.Desc) } }
              )).from("${pagerFrom}".toInt())
                  .size("${pagerSize}".toInt())} })
})

The query:

{
"aggregations": {
    "by_planning_sum_id": {
        "aggregations": {
            "bad_count": {
                "filter": {
                    "bool": {
                        "must": [{ "term": { "review_score_class": { "value": "bad" } } }]
                    }
                }
            },
            "country_data": {
                "aggregations": {
                    "avg_score": { "avg": { "field": "review_score" } }
                },
                "filter": {
                    "bool": {
                        "must": [{ "term": { "region_code": { "value": "JP" } } }]
                    }
                }
            },
            "bad_ratio": {
                "bucket_script": {
                    "buckets_path": { "all": "_count", "bad": "bad_count>_count" },
                    "script": { "source": "params.bad/params.all" }
                }
            },
            "zero_flag": {
                "bucket_script": {
                    "buckets_path": { "count": "country_data>_count" },
                    "gap_policy": "insert_zeros",
                    "script": { "source": "return ((params.count == 0) ? 0 : 1)" }
                }
            },
            "quality_negative_count": {
                "filter": {
                    "bool": {
                        "must": [
                            { "term": { "quality_label_class": { "value": "negative" } } }
                        ]
                    }
                }
            },
            "avg_score": { "avg": { "field": "review_score" } },
            "bucket_filter": {
                "bucket_selector": {
                    "buckets_path": { "count": "_count" },
                    "script": { "source": "params.count>=30" }
                }
            },
            "bucket_paging": {
                "bucket_sort": {
                    "from": 0,
                    "size": 50,
                    "sort": [
                        { "zero_flag": { "order": "desc" } },
                        { "country_data>avg_score": { "order": "desc" } },
                        { "avg_score": { "order": "desc" } },
                        { "_key": { "order": "desc" } }
                    ]
                }
            }
        },
        "terms": { "field": "root_planning_sum_id", "size": 10000 }
    }
},
"query": "..."
}
@l-trotta
Copy link
Contributor

l-trotta commented Feb 4, 2025

Hello! I'm waiting to have more information from the server team on this one, since the clients were build with the idea that order of json fields at the same level never mattered. In the meantime, this query can be executed as is by bypassing the java client and using the underlying Rest Client:

// Create the low-level client
try (RestClient restClient = RestClient
    .builder(HttpHost.create(serverUrl))
    .setDefaultHeaders(new Header[]{
         new BasicHeader("Authorization", "ApiKey " + apiKey),
     })
    .build()) {

         Request request = new Request("GET", "/_search");
         request.addParameter("index", "index");
         request.setJsonEntity("copypaste-query-here");
    
         Response binaryRes = restClient.performRequest(request);
}

The response will have to be read from the binary, but then you should be able to deserialize it into a dsl SearchResponse. I'll update this as soon as I have more information!

@giangnvt
Copy link
Author

giangnvt commented Feb 6, 2025

Hello! I'm waiting to have more information from the server team on this one, since the clients were build with the idea that order of json fields at the same level never mattered. In the meantime, this query can be executed as is by bypassing the java client and using the underlying Rest Client:

// Create the low-level client
try (RestClient restClient = RestClient
.builder(HttpHost.create(serverUrl))
.setDefaultHeaders(new Header[]{
new BasicHeader("Authorization", "ApiKey " + apiKey),
})
.build()) {

     Request request = new Request("GET", "/_search");
     request.addParameter("index", "index");
     request.setJsonEntity("copypaste-query-here");

     Response binaryRes = restClient.performRequest(request);

}
The response will have to be read from the binary, but then you should be able to deserialize it into a dsl SearchResponse. I'll update this as soon as I have more information!

@l-trotta
Because we use a framework to handle such ES queries and it needs a hugh rework to apply such change. Thanks anyway for suggestion.
By the way, go deeper into the related aggregation builder interface, turn out it does provide a constructer to build a list of aggregation from a HashMap. So we can result this issue by storing the aggregations into a LinkedHashMap with the desired order.
So I think this issue should be closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants