From 3c2a78a47c75cd95572893587b6d1f94cd6b6189 Mon Sep 17 00:00:00 2001 From: fengzhiquan Date: Mon, 2 Nov 2020 18:18:52 +0800 Subject: [PATCH 1/6] FIX:fatal error: concurrent map read and map write(apache#544) --- .gitignore | 3 ++- consumer/offset_store.go | 38 +++++++++++++++++++++-------------- consumer/offset_store_test.go | 7 +++---- go.sum | 1 + 4 files changed, 29 insertions(+), 20 deletions(-) diff --git a/.gitignore b/.gitignore index a0c292d1..99fd04d1 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ go.mod go.sum vendor/ coverage.txt -examples/test \ No newline at end of file +examples/test +/.vscode \ No newline at end of file diff --git a/consumer/offset_store.go b/consumer/offset_store.go index 17f5d76a..4d0b271a 100644 --- a/consumer/offset_store.go +++ b/consumer/offset_store.go @@ -26,13 +26,12 @@ import ( "sync" "time" - jsoniter "github.com/json-iterator/go" - "github.com/apache/rocketmq-client-go/v2/internal" "github.com/apache/rocketmq-client-go/v2/internal/remote" "github.com/apache/rocketmq-client-go/v2/internal/utils" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/rlog" + jsoniter "github.com/json-iterator/go" ) type readType int @@ -101,7 +100,7 @@ func (mq *MessageQueueKey) UnmarshalText(text []byte) error { type localFileOffsetStore struct { group string path string - OffsetTable map[MessageQueueKey]int64 + OffsetTable *sync.Map // concurrent safe , map[MessageQueueKey]int64 // mutex for offset file mutex sync.Mutex } @@ -110,7 +109,7 @@ func NewLocalFileOffsetStore(clientID, group string) OffsetStore { store := &localFileOffsetStore{ group: group, path: filepath.Join(_LocalOffsetStorePath, clientID, group, "offset.json"), - OffsetTable: make(map[MessageQueueKey]int64), + OffsetTable: new(sync.Map), } store.load() return store @@ -151,7 +150,9 @@ func (local *localFileOffsetStore) load() { } if datas != nil { - local.OffsetTable = datas + for k, v := range datas { + local.OffsetTable.Store(k, v) + } } } @@ -180,17 +181,17 @@ func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int "new_offset": offset, }) key := MessageQueueKey(*mq) - localOffset, exist := local.OffsetTable[key] + localOffset, exist := local.OffsetTable.Load(key) if !exist { - local.OffsetTable[key] = offset + local.OffsetTable.Store(key, offset) return } if increaseOnly { - if localOffset < offset { - local.OffsetTable[key] = offset + if localOffset.(int64) < offset { + local.OffsetTable.Store(key, offset) } } else { - local.OffsetTable[key] = offset + local.OffsetTable.Store(key, offset) } } @@ -201,10 +202,17 @@ func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) { local.mutex.Lock() defer local.mutex.Unlock() + datas := make(map[MessageQueueKey]int64) + local.OffsetTable.Range(func(key, value interface{}) bool { + k := key.(MessageQueueKey) + v := value.(int64) + datas[k] = v + return true + }) + wrapper := OffsetSerializeWrapper{ - OffsetTable: local.OffsetTable, + OffsetTable: datas, } - data, _ := jsoniter.Marshal(wrapper) utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), utils.WriteToFile(local.path, data)) } @@ -384,11 +392,11 @@ func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq p return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second) } -func readFromMemory(table map[MessageQueueKey]int64, mq *primitive.MessageQueue) int64 { - localOffset, exist := table[MessageQueueKey(*mq)] +func readFromMemory(table *sync.Map, mq *primitive.MessageQueue) int64 { + localOffset, exist := table.Load(MessageQueueKey(*mq)) if !exist { return -1 } - return localOffset + return localOffset.(int64) } diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go index 27b98d94..cfa0eaac 100644 --- a/consumer/offset_store_test.go +++ b/consumer/offset_store_test.go @@ -21,12 +21,11 @@ import ( "path/filepath" "testing" - "github.com/golang/mock/gomock" - . "github.com/smartystreets/goconvey/convey" - "github.com/apache/rocketmq-client-go/v2/internal" "github.com/apache/rocketmq-client-go/v2/internal/remote" "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/golang/mock/gomock" + . "github.com/smartystreets/goconvey/convey" ) func TestNewLocalFileOffsetStore(t *testing.T) { @@ -136,7 +135,7 @@ func TestLocalFileOffsetStore(t *testing.T) { offset = localStore.read(mq, _ReadFromStore) So(offset, ShouldEqual, 1) - delete(localStore.(*localFileOffsetStore).OffsetTable, MessageQueueKey(*mq)) + localStore.(*localFileOffsetStore).OffsetTable.Delete(MessageQueueKey(*mq)) offset = localStore.read(mq, _ReadMemoryThenStore) So(offset, ShouldEqual, 1) }) diff --git a/go.sum b/go.sum index 3b724dbd..e1d8bbd5 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= +github.com/feiquan123/rocketmq-client-go v1.2.4 h1:1GKQZ5pr1m4P2wruQfKtIbr0w9ZScoDefCqEOB/4E3c= github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= From eeb60bb620cd3a33fc1ed87225898a176d1cb183 Mon Sep 17 00:00:00 2001 From: fengzhiquan Date: Mon, 2 Nov 2020 18:20:13 +0800 Subject: [PATCH 2/6] go mod tidy --- go.sum | 1 - 1 file changed, 1 deletion(-) diff --git a/go.sum b/go.sum index e1d8bbd5..3b724dbd 100644 --- a/go.sum +++ b/go.sum @@ -3,7 +3,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= -github.com/feiquan123/rocketmq-client-go v1.2.4 h1:1GKQZ5pr1m4P2wruQfKtIbr0w9ZScoDefCqEOB/4E3c= github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= From 3894b465953ca56414e5cd21d7cbce983fab99c2 Mon Sep 17 00:00:00 2001 From: feiquan123 Date: Thu, 12 Nov 2020 00:55:26 +0800 Subject: [PATCH 3/6] update --- README.md | 1 + README.md~ | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) create mode 100644 README.md~ diff --git a/README.md b/README.md index 477a5907..7f532770 100644 --- a/README.md +++ b/README.md @@ -53,3 +53,4 @@ For 2.0.0 version, it supports: ---------- ## License [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation + diff --git a/README.md~ b/README.md~ new file mode 100644 index 00000000..477a5907 --- /dev/null +++ b/README.md~ @@ -0,0 +1,55 @@ +## RocketMQ Client Go +[![TravisCI](https://travis-ci.org/apache/rocketmq-client-go.svg)](https://travis-ci.org/apache/rocketmq-client-go) +[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) +[![Go Report Card](https://goreportcard.com/badge/github.com/apache/rocketmq-client-go)](https://goreportcard.com/report/github.com/apache/rocketmq-client-go) +[![GoDoc](https://img.shields.io/badge/Godoc-reference-blue.svg)](https://godoc.org/github.com/apache/rocketmq-client-go) +[![CodeCov](https://codecov.io/gh/apache/rocketmq-client-go/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/rocketmq-client-go) +[![GitHub release](https://img.shields.io/badge/release-download-default.svg)](https://github.com/apache/rocketmq-client-go/releases) +[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-client-go.svg)](http://isitmaintained.com/project/apache/rocketmq-client-go "Average time to resolve an issue") +[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-client-go.svg)](http://isitmaintained.com/project/apache/rocketmq-client-go "Percentage of issues still open") +![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social) + +A product ready RocketMQ Client in pure go, which supports almost the full features of Apache RocketMQ, such as pub and sub messages, ACL, tracing and so on. + +---------- +## [Due Diligence](https://github.com/apache/rocketmq-client-go/issues/423) +[Here](https://github.com/apache/rocketmq-client-go/issues/423), we sincerely invite you to take a minute to feedback on your usage scenario. +[Click Here](https://github.com/apache/rocketmq-client-go/issues/423) or go to [ISSUE #423](https://github.com/apache/rocketmq-client-go/issues/423) if you accept. + +---------- +## Features +For 2.0.0 version, it supports: +* sending message in synchronous mode +* sending message in asynchronous mode +* sending message in oneway mode +* sending orderly messages +* consuming message using push model +* message tracing for pub and sub messages +* ACL for producers and consumers + +---------- +## How to use +* Step-by-step instruction are provided in [RocketMQ Go Client Introduction](docs/Introduction.md) +* Consult [RocketMQ Quick Start](https://rocketmq.apache.org/docs/quick-start/) to setup rocketmq broker and nameserver. + +---------- +## Apache RocketMQ Community +* [RocketMQ Community Projects](https://github.com/apache/rocketmq-externals) + +---------- +## Contact us +* Mailing Lists: +* Home: +* Docs: +* Issues: +* Ask: +* Slack: + +---------- +## How to Contribute + Contributions are warmly welcome! Be it trivial cleanup, major new feature or other suggestion. Read this [how to contribute](http://rocketmq.apache.org/docs/how-to-contribute/) guide for more details. + + +---------- +## License + [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation From 0b8ea30c4a6a41553c3e7795a5c9f5ef76a859a9 Mon Sep 17 00:00:00 2001 From: fengzhiquan Date: Wed, 2 Dec 2020 09:23:10 +0800 Subject: [PATCH 4/6] rm README.MD~ --- README.md~ | 55 ------------------------------------------------------ 1 file changed, 55 deletions(-) delete mode 100644 README.md~ diff --git a/README.md~ b/README.md~ deleted file mode 100644 index 477a5907..00000000 --- a/README.md~ +++ /dev/null @@ -1,55 +0,0 @@ -## RocketMQ Client Go -[![TravisCI](https://travis-ci.org/apache/rocketmq-client-go.svg)](https://travis-ci.org/apache/rocketmq-client-go) -[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) -[![Go Report Card](https://goreportcard.com/badge/github.com/apache/rocketmq-client-go)](https://goreportcard.com/report/github.com/apache/rocketmq-client-go) -[![GoDoc](https://img.shields.io/badge/Godoc-reference-blue.svg)](https://godoc.org/github.com/apache/rocketmq-client-go) -[![CodeCov](https://codecov.io/gh/apache/rocketmq-client-go/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/rocketmq-client-go) -[![GitHub release](https://img.shields.io/badge/release-download-default.svg)](https://github.com/apache/rocketmq-client-go/releases) -[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-client-go.svg)](http://isitmaintained.com/project/apache/rocketmq-client-go "Average time to resolve an issue") -[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-client-go.svg)](http://isitmaintained.com/project/apache/rocketmq-client-go "Percentage of issues still open") -![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social) - -A product ready RocketMQ Client in pure go, which supports almost the full features of Apache RocketMQ, such as pub and sub messages, ACL, tracing and so on. - ----------- -## [Due Diligence](https://github.com/apache/rocketmq-client-go/issues/423) -[Here](https://github.com/apache/rocketmq-client-go/issues/423), we sincerely invite you to take a minute to feedback on your usage scenario. -[Click Here](https://github.com/apache/rocketmq-client-go/issues/423) or go to [ISSUE #423](https://github.com/apache/rocketmq-client-go/issues/423) if you accept. - ----------- -## Features -For 2.0.0 version, it supports: -* sending message in synchronous mode -* sending message in asynchronous mode -* sending message in oneway mode -* sending orderly messages -* consuming message using push model -* message tracing for pub and sub messages -* ACL for producers and consumers - ----------- -## How to use -* Step-by-step instruction are provided in [RocketMQ Go Client Introduction](docs/Introduction.md) -* Consult [RocketMQ Quick Start](https://rocketmq.apache.org/docs/quick-start/) to setup rocketmq broker and nameserver. - ----------- -## Apache RocketMQ Community -* [RocketMQ Community Projects](https://github.com/apache/rocketmq-externals) - ----------- -## Contact us -* Mailing Lists: -* Home: -* Docs: -* Issues: -* Ask: -* Slack: - ----------- -## How to Contribute - Contributions are warmly welcome! Be it trivial cleanup, major new feature or other suggestion. Read this [how to contribute](http://rocketmq.apache.org/docs/how-to-contribute/) guide for more details. - - ----------- -## License - [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation From d1ce10479dc99b277b808add750dff683fb6a37d Mon Sep 17 00:00:00 2001 From: feiquan123 Date: Thu, 3 Dec 2020 22:00:38 +0800 Subject: [PATCH 5/6] rm README.md space lines:56-57 --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 7f532770..53c7880e 100644 --- a/README.md +++ b/README.md @@ -52,5 +52,4 @@ For 2.0.0 version, it supports: ---------- ## License - [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation - + [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation \ No newline at end of file From 2ddfde4d5ef5352ad159e62ec9de246e35adc5c9 Mon Sep 17 00:00:00 2001 From: fengzhiquan Date: Mon, 2 Nov 2020 18:20:13 +0800 Subject: [PATCH 6/6] [ISSUE #544] Fix bug, concurrent map read and map write local offest file update rm README.MD~ rm README.md space lines:56-57 --- README.md | 2 +- go.sum | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 477a5907..53c7880e 100644 --- a/README.md +++ b/README.md @@ -52,4 +52,4 @@ For 2.0.0 version, it supports: ---------- ## License - [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation + [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation \ No newline at end of file diff --git a/go.sum b/go.sum index e1d8bbd5..3b724dbd 100644 --- a/go.sum +++ b/go.sum @@ -3,7 +3,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= -github.com/feiquan123/rocketmq-client-go v1.2.4 h1:1GKQZ5pr1m4P2wruQfKtIbr0w9ZScoDefCqEOB/4E3c= github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=