Skip to content

Commit 6da3cc8

Browse files
committed
Add optimistic concurrency control
This commit adds optimistic concurrency control as described in https://www.elastic.co/guide/en/elasticsearch/reference/7.3/optimistic-concurrency-control.html. Close #1143 Close #1154
1 parent 37f9df1 commit 6da3cc8

20 files changed

+611
-174
lines changed

bulk_delete_request.go

+41-21
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ import (
2020
// for details.
2121
type BulkDeleteRequest struct {
2222
BulkableRequest
23-
index string
24-
typ string
25-
id string
26-
parent string
27-
routing string
28-
version int64 // default is MATCH_ANY
29-
versionType string // default is "internal"
23+
index string
24+
typ string
25+
id string
26+
parent string
27+
routing string
28+
version int64 // default is MATCH_ANY
29+
versionType string // default is "internal"
30+
ifSeqNo *int64
31+
ifPrimaryTerm *int64
3032

3133
source []string
3234

@@ -38,13 +40,15 @@ type bulkDeleteRequestCommand map[string]bulkDeleteRequestCommandOp
3840

3941
//easyjson:json
4042
type bulkDeleteRequestCommandOp struct {
41-
Index string `json:"_index,omitempty"`
42-
Type string `json:"_type,omitempty"`
43-
Id string `json:"_id,omitempty"`
44-
Parent string `json:"parent,omitempty"`
45-
Routing string `json:"routing,omitempty"`
46-
Version int64 `json:"version,omitempty"`
47-
VersionType string `json:"version_type,omitempty"`
43+
Index string `json:"_index,omitempty"`
44+
Type string `json:"_type,omitempty"`
45+
Id string `json:"_id,omitempty"`
46+
Parent string `json:"parent,omitempty"`
47+
Routing string `json:"routing,omitempty"`
48+
Version int64 `json:"version,omitempty"`
49+
VersionType string `json:"version_type,omitempty"`
50+
IfSeqNo *int64 `json:"if_seq_no,omitempty"`
51+
IfPrimaryTerm *int64 `json:"if_primary_term,omitempty"`
4852
}
4953

5054
// NewBulkDeleteRequest returns a new BulkDeleteRequest.
@@ -116,6 +120,20 @@ func (r *BulkDeleteRequest) VersionType(versionType string) *BulkDeleteRequest {
116120
return r
117121
}
118122

123+
// IfSeqNo indicates to only perform the delete operation if the last
124+
// operation that has changed the document has the specified sequence number.
125+
func (r *BulkDeleteRequest) IfSeqNo(ifSeqNo int64) *BulkDeleteRequest {
126+
r.ifSeqNo = &ifSeqNo
127+
return r
128+
}
129+
130+
// IfPrimaryTerm indicates to only perform the delete operation if the
131+
// last operation that has changed the document has the specified primary term.
132+
func (r *BulkDeleteRequest) IfPrimaryTerm(ifPrimaryTerm int64) *BulkDeleteRequest {
133+
r.ifPrimaryTerm = &ifPrimaryTerm
134+
return r
135+
}
136+
119137
// String returns the on-wire representation of the delete request,
120138
// concatenated as a single string.
121139
func (r *BulkDeleteRequest) String() string {
@@ -136,13 +154,15 @@ func (r *BulkDeleteRequest) Source() ([]string, error) {
136154
}
137155
command := bulkDeleteRequestCommand{
138156
"delete": bulkDeleteRequestCommandOp{
139-
Index: r.index,
140-
Type: r.typ,
141-
Id: r.id,
142-
Routing: r.routing,
143-
Parent: r.parent,
144-
Version: r.version,
145-
VersionType: r.versionType,
157+
Index: r.index,
158+
Type: r.typ,
159+
Id: r.id,
160+
Routing: r.routing,
161+
Parent: r.parent,
162+
Version: r.version,
163+
VersionType: r.versionType,
164+
IfSeqNo: r.ifSeqNo,
165+
IfPrimaryTerm: r.ifPrimaryTerm,
146166
},
147167
}
148168

bulk_delete_request_easyjson.go

+54-18
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bulk_index_request.go

+20
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type BulkIndexRequest struct {
2929
doc interface{}
3030
pipeline string
3131
retryOnConflict *int
32+
ifSeqNo *int64
33+
ifPrimaryTerm *int64
3234

3335
source []string
3436

@@ -50,6 +52,8 @@ type bulkIndexRequestCommandOp struct {
5052
Version *int64 `json:"version,omitempty"`
5153
VersionType string `json:"version_type,omitempty"`
5254
Pipeline string `json:"pipeline,omitempty"`
55+
IfSeqNo *int64 `json:"if_seq_no,omitempty"`
56+
IfPrimaryTerm *int64 `json:"if_primary_term,omitempty"`
5357
}
5458

5559
// NewBulkIndexRequest returns a new BulkIndexRequest.
@@ -158,6 +162,20 @@ func (r *BulkIndexRequest) Pipeline(pipeline string) *BulkIndexRequest {
158162
return r
159163
}
160164

165+
// IfSeqNo indicates to only perform the index operation if the last
166+
// operation that has changed the document has the specified sequence number.
167+
func (r *BulkIndexRequest) IfSeqNo(ifSeqNo int64) *BulkIndexRequest {
168+
r.ifSeqNo = &ifSeqNo
169+
return r
170+
}
171+
172+
// IfPrimaryTerm indicates to only perform the index operation if the
173+
// last operation that has changed the document has the specified primary term.
174+
func (r *BulkIndexRequest) IfPrimaryTerm(ifPrimaryTerm int64) *BulkIndexRequest {
175+
r.ifPrimaryTerm = &ifPrimaryTerm
176+
return r
177+
}
178+
161179
// String returns the on-wire representation of the index request,
162180
// concatenated as a single string.
163181
func (r *BulkIndexRequest) String() string {
@@ -193,6 +211,8 @@ func (r *BulkIndexRequest) Source() ([]string, error) {
193211
VersionType: r.versionType,
194212
RetryOnConflict: r.retryOnConflict,
195213
Pipeline: r.pipeline,
214+
IfSeqNo: r.ifSeqNo,
215+
IfPrimaryTerm: r.ifPrimaryTerm,
196216
}
197217
command := bulkIndexRequestCommand{
198218
r.opType: indexCommand,

0 commit comments

Comments
 (0)