Skip to content

Commit

Permalink
remove broken things; fix sending
Browse files Browse the repository at this point in the history
  • Loading branch information
JJTech0130 committed Jul 12, 2024
1 parent 66a263d commit 4dcd516
Show file tree
Hide file tree
Showing 6 changed files with 674 additions and 529 deletions.
134 changes: 132 additions & 2 deletions pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connector

import (
"context"
"errors"
"fmt"
"strconv"
"time"
Expand All @@ -14,15 +15,21 @@ import (
"go.mau.fi/mautrix-meta/config"
"go.mau.fi/mautrix-meta/messagix"
"go.mau.fi/mautrix-meta/messagix/cookies"

//"go.mau.fi/mautrix-meta/messagix/socket"
"go.mau.fi/mautrix-meta/messagix/table"
"go.mau.fi/mautrix-meta/messagix/types"

"go.mau.fi/mautrix-meta/pkg/connector/msgconv"

"maunium.net/go/mautrix/bridge/status"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/database"
"maunium.net/go/mautrix/bridgev2/networkid"
"maunium.net/go/mautrix/event"

//"maunium.net/go/mautrix/event"
metaTypes "go.mau.fi/mautrix-meta/messagix/types"
)

type metaEvent struct {
Expand Down Expand Up @@ -285,9 +292,132 @@ func (m *MetaClient) GetUserInfo(ctx context.Context, ghost *bridgev2.Ghost) (*b
panic("unimplemented")
}

type msgconvContextKey int

const (
msgconvContextKeyIntent msgconvContextKey = iota
msgconvContextKeyClient
msgconvContextKeyE2EEClient
msgconvContextKeyBackfill
)

// HandleMatrixMessage implements bridgev2.NetworkAPI.
func (m *MetaClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.MatrixMessage) (message *bridgev2.MatrixMessageResponse, err error) {
panic("unimplemented")
func (m *MetaClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.MatrixMessage) (*bridgev2.MatrixMessageResponse, error) {
log := zerolog.Ctx(ctx)

content, ok := msg.Event.Content.Parsed.(*event.MessageEventContent)
if !ok {
log.Error().Type("content_type", content).Msg("Unexpected parsed content type")
return nil, fmt.Errorf("unexpected parsed content type: %T", content)
}
if content.MsgType == event.MsgNotice /*&& !portal.bridge.Config.Bridge.BridgeNotices*/ {
log.Warn().Msg("Ignoring notice message")
return nil, nil
}

ctx = context.WithValue(ctx, msgconvContextKeyClient, m.client)

thread, err := strconv.Atoi(string(msg.Portal.ID))
if err != nil {
log.Err(err).Str("thread_id", string(msg.Portal.ID)).Msg("Failed to parse thread ID")
return nil, fmt.Errorf("failed to parse thread ID: %w", err)
}

tasks, otid, err := m.messageConverter.ToMeta(ctx, msg.Event, content, false, int64(thread))
if errors.Is(err, metaTypes.ErrPleaseReloadPage) {
log.Err(err).Msg("Got please reload page error while converting message, reloading page in background")
// go m.client.Disconnect()
// err = errReloading
panic("unimplemented")
} else if errors.Is(err, messagix.ErrTokenInvalidated) {
panic("unimplemented")
// go sender.DisconnectFromError(status.BridgeState{
// StateEvent: status.StateBadCredentials,
// Error: MetaCookieRemoved,
// })
// err = errLoggedOut
}

if err != nil {
log.Err(err).Msg("Failed to convert message")
//go ms.sendMessageMetrics(evt, err, "Error converting", true)
return nil, err
}

log.UpdateContext(func(c zerolog.Context) zerolog.Context {
return c.Int64("otid", otid)
})
log.Debug().Msg("Sending Matrix message to Meta")

//otidStr := strconv.FormatInt(otid, 10)
//portal.pendingMessages[otid] = evt.ID
//messageTS := time.Now()
var resp *table.LSTable

retries := 0
for retries < 5 {
if err = m.client.WaitUntilCanSendMessages(15 * time.Second); err != nil {
log.Err(err).Msg("Error waiting to be able to send messages, retrying")
} else {
resp, err = m.client.ExecuteTasks(tasks...)
if err == nil {
break
}
log.Err(err).Msg("Failed to send message to Meta, retrying")
}
retries++
}

log.Trace().Any("response", resp).Msg("Meta send response")
// var msgID string
// if resp != nil && err == nil {
// for _, replace := range resp.LSReplaceOptimsiticMessage {
// if replace.OfflineThreadingId == otidStr {
// msgID = replace.MessageId
// }
// }
// if len(msgID) == 0 {
// for _, failed := range resp.LSMarkOptimisticMessageFailed {
// if failed.OTID == otidStr {
// log.Warn().Str("message", failed.Message).Msg("Sending message failed")
// //go ms.sendMessageMetrics(evt, fmt.Errorf("%w: %s", errServerRejected, failed.Message), "Error sending", true)
// return
// }
// }
// for _, failed := range resp.LSHandleFailedTask {
// if failed.OTID == otidStr {
// log.Warn().Str("message", failed.Message).Msg("Sending message failed")
// go ms.sendMessageMetrics(evt, fmt.Errorf("%w: %s", errServerRejected, failed.Message), "Error sending", true)
// return
// }
// }
// log.Warn().Msg("Message send response didn't include message ID")
// }
// }
// if msgID != "" {
// portal.pendingMessagesLock.Lock()
// _, ok = portal.pendingMessages[otid]
// if ok {
// portal.storeMessageInDB(ctx, evt.ID, msgID, otid, sender.MetaID, messageTS, 0)
// delete(portal.pendingMessages, otid)
// } else {
// log.Debug().Msg("Not storing message send response: pending message was already removed from map")
// }
// portal.pendingMessagesLock.Unlock()
// }

return &bridgev2.MatrixMessageResponse{
DB: &database.Message{
//ID: nil,
MXID: msg.Event.ID,
Room: networkid.PortalKey{ID: msg.Portal.ID},
//SenderID: nil,
Timestamp: time.Time{},
},
}, nil

// timings.totalSend = time.Since(start)
// go ms.sendMessageMetrics(evt, err, "Error sending", true)
}

// IsLoggedIn implements bridgev2.NetworkAPI.
Expand Down
176 changes: 89 additions & 87 deletions pkg/connector/msgconv/from-matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ import (
"context"
"errors"
"fmt"
"net/http"

//"net/http"
"time"

"github.com/rs/zerolog"
"go.mau.fi/util/exerrors"
"go.mau.fi/util/exmime"
"go.mau.fi/util/ffmpeg"
//"github.com/rs/zerolog"
//"go.mau.fi/util/exerrors"
//"go.mau.fi/util/exmime"
//"go.mau.fi/util/ffmpeg"
"maunium.net/go/mautrix/event"

"go.mau.fi/mautrix-meta/messagix"
//"go.mau.fi/mautrix-meta/messagix"
"go.mau.fi/mautrix-meta/messagix/methods"
"go.mau.fi/mautrix-meta/messagix/socket"
"go.mau.fi/mautrix-meta/messagix/table"
"go.mau.fi/mautrix-meta/messagix/types"
//"go.mau.fi/mautrix-meta/messagix/types"
)

var (
Expand All @@ -46,20 +47,21 @@ var (
ErrURLNotFound = errors.New("url not found")
)

func (mc *MessageConverter) ToMeta(ctx context.Context, evt *event.Event, content *event.MessageEventContent, relaybotFormatted bool) ([]socket.Task, int64, error) {
func (mc *MessageConverter) ToMeta(ctx context.Context, evt *event.Event, content *event.MessageEventContent, relaybotFormatted bool, threadID int64) ([]socket.Task, int64, error) {
if evt.Type == event.EventSticker {
content.MsgType = event.MsgImage
}

task := &socket.SendMessageTask{
ThreadId: mc.GetData(ctx).ThreadID,
//ThreadId: mc.GetData(ctx).ThreadID,
ThreadId: threadID,
Otid: methods.GenerateEpochId(),
Source: table.MESSENGER_INBOX_IN_THREAD,
InitiatingSource: table.FACEBOOK_INBOX,
SendType: table.TEXT,
SyncGroup: 1,

ReplyMetaData: mc.GetMetaReply(ctx, content),
//ReplyMetaData: mc.GetMetaReply(ctx, content),
}
if content.MsgType == event.MsgEmote && !relaybotFormatted {
content.Body = "/me " + content.Body
Expand All @@ -70,22 +72,22 @@ func (mc *MessageConverter) ToMeta(ctx context.Context, evt *event.Event, conten
switch content.MsgType {
case event.MsgText, event.MsgNotice, event.MsgEmote:
task.Text = content.Body
case event.MsgImage, event.MsgVideo, event.MsgAudio, event.MsgFile:
resp, err := mc.reuploadFileToMeta(ctx, evt, content)
if err != nil {
return nil, 0, err
}
attachmentID := resp.Payload.RealMetadata.GetFbId()
if attachmentID == 0 {
zerolog.Ctx(ctx).Warn().RawJSON("response", resp.Raw).Msg("No fbid received for upload")
return nil, 0, fmt.Errorf("failed to upload attachment: fbid not received")
}
task.SendType = table.MEDIA
task.AttachmentFBIds = []int64{attachmentID}
if content.FileName != "" && content.Body != content.FileName {
// This might not actually be allowed
task.Text = content.Body
}
// case event.MsgImage, event.MsgVideo, event.MsgAudio, event.MsgFile:
// resp, err := mc.reuploadFileToMeta(ctx, evt, content)
// if err != nil {
// return nil, 0, err
// }
// attachmentID := resp.Payload.RealMetadata.GetFbId()
// if attachmentID == 0 {
// zerolog.Ctx(ctx).Warn().RawJSON("response", resp.Raw).Msg("No fbid received for upload")
// return nil, 0, fmt.Errorf("failed to upload attachment: fbid not received")
// }
// task.SendType = table.MEDIA
// task.AttachmentFBIds = []int64{attachmentID}
// if content.FileName != "" && content.Body != content.FileName {
// // This might not actually be allowed
// task.Text = content.Body
// }
case event.MsgLocation:
// TODO implement
fallthrough
Expand All @@ -101,65 +103,65 @@ func (mc *MessageConverter) ToMeta(ctx context.Context, evt *event.Event, conten
return []socket.Task{task, readTask}, task.Otid, nil
}

func (mc *MessageConverter) downloadMatrixMedia(ctx context.Context, content *event.MessageEventContent) (data []byte, mimeType, fileName string, err error) {
mxc := content.URL
if content.File != nil {
mxc = content.File.URL
}
data, err = mc.DownloadMatrixMedia(ctx, mxc)
if err != nil {
err = exerrors.NewDualError(ErrMediaDownloadFailed, err)
return
}
if content.File != nil {
err = content.File.DecryptInPlace(data)
if err != nil {
err = exerrors.NewDualError(ErrMediaDecryptFailed, err)
return
}
}
mimeType = content.GetInfo().MimeType
if mimeType == "" {
mimeType = http.DetectContentType(data)
}
fileName = content.FileName
if fileName == "" {
fileName = content.Body
if fileName == "" {
fileName = string(content.MsgType)[2:] + exmime.ExtensionFromMimetype(mimeType)
}
}
return
}
// func (mc *MessageConverter) downloadMatrixMedia(ctx context.Context, content *event.MessageEventContent) (data []byte, mimeType, fileName string, err error) {
// mxc := content.URL
// if content.File != nil {
// mxc = content.File.URL
// }
// data, err = mc.DownloadMatrixMedia(ctx, mxc)
// if err != nil {
// err = exerrors.NewDualError(ErrMediaDownloadFailed, err)
// return
// }
// if content.File != nil {
// err = content.File.DecryptInPlace(data)
// if err != nil {
// err = exerrors.NewDualError(ErrMediaDecryptFailed, err)
// return
// }
// }
// mimeType = content.GetInfo().MimeType
// if mimeType == "" {
// mimeType = http.DetectContentType(data)
// }
// fileName = content.FileName
// if fileName == "" {
// fileName = content.Body
// if fileName == "" {
// fileName = string(content.MsgType)[2:] + exmime.ExtensionFromMimetype(mimeType)
// }
// }
// return
// }

func (mc *MessageConverter) reuploadFileToMeta(ctx context.Context, evt *event.Event, content *event.MessageEventContent) (*types.MercuryUploadResponse, error) {
threadID := mc.GetData(ctx).ThreadID
data, mimeType, fileName, err := mc.downloadMatrixMedia(ctx, content)
if err != nil {
return nil, err
}
_, isVoice := evt.Content.Raw["org.matrix.msc3245.voice"]
if isVoice {
data, err = ffmpeg.ConvertBytes(ctx, data, ".m4a", []string{}, []string{"-c:a", "aac"}, mimeType)
if err != nil {
return nil, err
}
mimeType = "audio/mp4"
fileName += ".m4a"
}
resp, err := mc.GetClient(ctx).SendMercuryUploadRequest(ctx, threadID, &messagix.MercuryUploadMedia{
Filename: fileName,
MimeType: mimeType,
MediaData: data,
IsVoiceClip: isVoice,
})
if err != nil {
zerolog.Ctx(ctx).Debug().
Str("file_name", fileName).
Str("mime_type", mimeType).
Bool("is_voice_clip", isVoice).
Msg("Failed upload metadata")
return nil, fmt.Errorf("%w: %w", ErrMediaUploadFailed, err)
}
return resp, nil
}
// func (mc *MessageConverter) reuploadFileToMeta(ctx context.Context, evt *event.Event, content *event.MessageEventContent) (*types.MercuryUploadResponse, error) {
// threadID := mc.GetData(ctx).ThreadID
// data, mimeType, fileName, err := mc.downloadMatrixMedia(ctx, content)
// if err != nil {
// return nil, err
// }
// _, isVoice := evt.Content.Raw["org.matrix.msc3245.voice"]
// if isVoice {
// data, err = ffmpeg.ConvertBytes(ctx, data, ".m4a", []string{}, []string{"-c:a", "aac"}, mimeType)
// if err != nil {
// return nil, err
// }
// mimeType = "audio/mp4"
// fileName += ".m4a"
// }
// resp, err := mc.GetClient(ctx).SendMercuryUploadRequest(ctx, threadID, &messagix.MercuryUploadMedia{
// Filename: fileName,
// MimeType: mimeType,
// MediaData: data,
// IsVoiceClip: isVoice,
// })
// if err != nil {
// zerolog.Ctx(ctx).Debug().
// Str("file_name", fileName).
// Str("mime_type", mimeType).
// Bool("is_voice_clip", isVoice).
// Msg("Failed upload metadata")
// return nil, fmt.Errorf("%w: %w", ErrMediaUploadFailed, err)
// }
// return resp, nil
// }
Loading

0 comments on commit 4dcd516

Please sign in to comment.