@@ -8,24 +8,23 @@ import (
88 "math"
99 "sync"
1010
11- "github.com/openimsdk/openim-sdk-core/v3/pkg/api"
12- "github.com/openimsdk/openim-sdk-core/v3/pkg/cache"
13- "github.com/openimsdk/protocol/msg"
14- "github.com/openimsdk/tools/utils/stringutil"
15-
1611 "github.com/openimsdk/openim-sdk-core/v3/internal/group"
1712 "github.com/openimsdk/openim-sdk-core/v3/internal/interaction"
1813 "github.com/openimsdk/openim-sdk-core/v3/internal/relation"
1914 "github.com/openimsdk/openim-sdk-core/v3/internal/third/file"
2015 "github.com/openimsdk/openim-sdk-core/v3/internal/user"
2116 "github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback"
17+ "github.com/openimsdk/openim-sdk-core/v3/pkg/api"
18+ "github.com/openimsdk/openim-sdk-core/v3/pkg/cache"
2219 "github.com/openimsdk/openim-sdk-core/v3/pkg/common"
2320 "github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
21+ "github.com/openimsdk/openim-sdk-core/v3/pkg/converter"
2422 "github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface"
2523 "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
2624 "github.com/openimsdk/openim-sdk-core/v3/pkg/page"
2725 "github.com/openimsdk/openim-sdk-core/v3/pkg/syncer"
2826 pbConversation "github.com/openimsdk/protocol/conversation"
27+ "github.com/openimsdk/protocol/msg"
2928 "github.com/openimsdk/protocol/sdkws"
3029 "github.com/openimsdk/tools/errs"
3130 "github.com/openimsdk/tools/log"
@@ -233,15 +232,30 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
233232 conversationSet := make (map [string ]* model_struct.LocalConversation )
234233 phConversationChangedSet := make (map [string ]* model_struct.LocalConversation )
235234 phNewConversationSet := make (map [string ]* model_struct.LocalConversation )
235+ conversationIDs := make ([]string , 0 , len (allMsg ))
236236
237237 log .ZDebug (ctx , "message come here conversation ch" , "conversation length" , len (allMsg ))
238238 b := time .Now ()
239239
240240 onlineMap := make (map [onlineMsgKey ]struct {})
241241
242242 for conversationID , msgs := range allMsg {
243- log .ZDebug (ctx , "parse message in one conversation" , "conversationID" ,
244- conversationID , "message length" , len (msgs .Msgs ))
243+ conversationIDs = append (conversationIDs , conversationID )
244+ log .ZDebug (ctx , "parse message in one conversation" , "conversationID" , conversationID , "message length" , len (msgs .Msgs ), "data" , msgs .Msgs )
245+
246+ clientIDs := make ([]string , 0 , len (msgs .Msgs ))
247+ for _ , msg := range msgs .Msgs {
248+ clientIDs = append (clientIDs , msg .ClientMsgID )
249+ }
250+
251+ clientMsgs , err := c .db .GetMessagesByClientMsgIDs (ctx , conversationID , clientIDs )
252+ if err != nil {
253+ log .ZWarn (ctx , "GetMessagesByClientMsgIDs failed" , err , "conversationID" , conversationID , "clientIDs" , clientIDs )
254+ }
255+ clientMsgMap := datautil .SliceToMap (clientMsgs , func (e * model_struct.LocalChatLog ) string {
256+ return e .ClientMsgID
257+ })
258+
245259 var insertMessage , selfInsertMessage , othersInsertMessage []* model_struct.LocalChatLog
246260 var updateMessage []* model_struct.LocalChatLog
247261
@@ -257,17 +271,11 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
257271
258272 isSenderConversationUpdate = utils .GetSwitchFromOptions (v .Options , constant .IsSenderConversationUpdate )
259273
260- msg := & sdk_struct.MsgStruct {}
261- copier .Copy (msg , v )
262- msg .Content = string (v .Content )
263-
264- var attachedInfo sdk_struct.AttachedInfoElem
265- _ = utils .JsonStringToStruct (v .AttachedInfo , & attachedInfo )
266- msg .AttachedInfoElem = & attachedInfo
274+ msg := converter .MsgDataToMsgStruct (v )
267275
268276 //When the message has been marked and deleted by the cloud, it is directly inserted locally without any conversation and message update.
269277 if msg .Status == constant .MsgStatusHasDeleted {
270- dbMessage := MsgStructToLocalChatLog (msg )
278+ dbMessage := converter . MsgStructToLocalChatLog (msg )
271279 c .handleExceptionMessages (ctx , nil , dbMessage )
272280 exceptionMsg = append (exceptionMsg , dbMessage )
273281 insertMessage = append (insertMessage , dbMessage )
@@ -277,7 +285,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
277285 msg .Status = constant .MsgStatusSendSuccess
278286
279287 //De-analyze data
280- err := msgHandleByContentType (msg )
288+ err := converter . MsgHandleByContentType (msg )
281289 if err != nil {
282290 log .ZError (ctx , "Parsing data error:" , err , "type: " , msg .ContentType , "msg" , msg )
283291 continue
@@ -298,16 +306,16 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
298306 log .ZDebug (ctx , "decode message" , "msg" , msg )
299307 if v .SendID == c .loginUserID { //seq
300308 // Messages sent by myself //if sent through this terminal
301- existingMsg , err := c . db . GetMessage ( ctx , conversationID , msg .ClientMsgID )
302- if err == nil {
309+ existingMsg , ok := clientMsgMap [ msg .ClientMsgID ]
310+ if ok {
303311 log .ZInfo (ctx , "have message" , "msg" , msg )
304312 if existingMsg .Seq == 0 {
305313 if ! isConversationUpdate {
306314 msg .Status = constant .MsgStatusFiltered
307315 }
308- updateMessage = append (updateMessage , MsgStructToLocalChatLog (msg ))
316+ updateMessage = append (updateMessage , converter . MsgStructToLocalChatLog (msg ))
309317 } else {
310- dbMessage := MsgStructToLocalChatLog (msg )
318+ dbMessage := converter . MsgStructToLocalChatLog (msg )
311319 c .handleExceptionMessages (ctx , existingMsg , dbMessage )
312320 insertMessage = append (insertMessage , dbMessage )
313321 exceptionMsg = append (exceptionMsg , dbMessage )
@@ -334,11 +342,12 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
334342 newMessages = append (newMessages , msg )
335343 }
336344 if isHistory {
337- selfInsertMessage = append (selfInsertMessage , MsgStructToLocalChatLog (msg ))
345+ selfInsertMessage = append (selfInsertMessage , converter . MsgStructToLocalChatLog (msg ))
338346 }
339347 }
340348 } else { //Sent by others
341- if existingMsg , err := c .db .GetMessage (ctx , conversationID , msg .ClientMsgID ); err != nil {
349+ existingMsg , ok := clientMsgMap [msg .ClientMsgID ]
350+ if ! ok {
342351 lc := model_struct.LocalConversation {
343352 ConversationType : v .SessionType ,
344353 LatestMsg : utils .StructToJsonString (msg ),
@@ -368,11 +377,11 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
368377 newMessages = append (newMessages , msg )
369378 }
370379 if isHistory {
371- othersInsertMessage = append (othersInsertMessage , MsgStructToLocalChatLog (msg ))
380+ othersInsertMessage = append (othersInsertMessage , converter . MsgStructToLocalChatLog (msg ))
372381 }
373382
374383 } else {
375- dbMessage := MsgStructToLocalChatLog (msg )
384+ dbMessage := converter . MsgStructToLocalChatLog (msg )
376385 c .handleExceptionMessages (ctx , existingMsg , dbMessage )
377386 insertMessage = append (insertMessage , dbMessage )
378387 exceptionMsg = append (exceptionMsg , dbMessage )
@@ -390,19 +399,23 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
390399 c .conversationSyncMutex .Lock ()
391400 defer c .conversationSyncMutex .Unlock ()
392401
393- list , err := c .db .GetAllConversationListDB (ctx )
402+ list , err := c .db .GetMultipleConversationDB (ctx , conversationIDs )
394403 if err != nil {
395- log .ZError (ctx , "GetAllConversationListDB" , err )
404+ log .ZError (ctx , "GetMultipleConversationDB" , err , "conversationIDs" , conversationIDs )
405+ return
396406 }
397407
398- m := make (map [string ]* model_struct.LocalConversation )
399- listToMap (list , m )
400- log .ZDebug (ctx , "listToMap: " , "local conversation" , list , "generated c map" ,
401- string (stringutil .StructToJsonBytes (conversationSet )))
408+ var hList []* model_struct.LocalConversation
409+ m := datautil .SliceToMap (list , func (e * model_struct.LocalConversation ) string {
410+ if e .LatestMsgSendTime == 0 {
411+ hList = append (hList , e )
412+ }
413+ return e .ConversationID
414+ })
415+ log .ZDebug (ctx , "listToMap: " , "local conversation" , list , "generated c map" , conversationSet )
402416
403417 c .diff (ctx , m , conversationSet , conversationChangedSet , newConversationSet )
404- log .ZInfo (ctx , "trigger map is :" , "newConversations" , string (stringutil .StructToJsonBytes (newConversationSet )),
405- "changedConversations" , string (stringutil .StructToJsonBytes (conversationChangedSet )))
418+ log .ZInfo (ctx , "trigger map is :" , "newConversations" , newConversationSet , "changedConversations" , conversationChangedSet )
406419
407420 //seq sync message update
408421 if err := c .batchUpdateMessageList (ctx , updateMsg ); err != nil {
@@ -412,7 +425,6 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
412425 //Normal message storage
413426 _ = c .batchInsertMessageList (ctx , insertMsg )
414427
415- hList , _ := c .db .GetHiddenConversationList (ctx )
416428 for _ , v := range hList {
417429 if nc , ok := newConversationSet [v .ConversationID ]; ok {
418430 phConversationChangedSet [v .ConversationID ] = nc
0 commit comments