-
Notifications
You must be signed in to change notification settings - Fork 240
feat: add support to medatata to messages sent from the agents #415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Adds a richer “new conversation” message type so agent-initiated outbound messages can carry attachment metadata (e.g., generated images/files) to connectors, and updates connectors to consume it.
Changes:
- Introduce
types.ConversationMessage(OpenAI message + metadata) and update agent subscriber APIs to use it. - Accumulate action metadata onto
job.Metadataand send it along when thenew_conversationtool is invoked. - Update connectors (Telegram/Slack/Matrix/IRC/Email/Discord) to use the new subscriber payload; Telegram additionally sends images/songs/PDFs from metadata.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| services/connectors/telegram.go | Subscriber now receives ConversationMessage; sends media (images/songs/PDFs) from metadata before the text message. |
| services/connectors/slack.go | Update subscriber to read message content from ccm.Message.Content. |
| services/connectors/matrix.go | Update subscriber to read message content from ccm.Message.Content. |
| services/connectors/irc.go | Update subscriber to read message content from ccm.Message.Content. |
| services/connectors/email.go | Update subscriber to read message content from ccm.Message.Content. |
| services/connectors/discord.go | Update subscriber to read message content from ccm.Message.Content. |
| core/types/conversation.go | Add ConversationMessage type + helpers for attaching metadata. |
| core/agent/options.go | Change option type for new-conversation subscribers to func(*types.ConversationMessage). |
| core/agent/agent_test.go | Update subscriber test to handle ConversationMessage. |
| core/agent/agent.go | Change new-conversation channel/subscribers to ConversationMessage; merge action metadata into job metadata and forward it on new_conversation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Get accumulated metadata from job (e.g., images, files generated by previous actions in this job) | ||
| // This is per-job metadata, so parallel jobs won't interfere with each other | ||
| metadata := job.Metadata | ||
|
|
||
| go func(agent *Agent) { | ||
| xlog.Info("Sending new conversation to channel", "agent", agent.Character.Name, "message", msg.Content) | ||
| agent.newConversations <- msg | ||
| xlog.Info("Sending new conversation to channel", "agent", agent.Character.Name, "message", msg.Content, "metadata_keys", len(metadata)) | ||
| // Send ConversationMessage with both the message and accumulated metadata | ||
| agent.newConversations <- types.NewConversationMessage(msg).WithMetadata(metadata) | ||
| // Job metadata is automatically cleared when job finishes, no need to manually clear |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metadata := job.Metadata passes the job’s metadata map by reference into a goroutine and then to subscribers. This risks data races or accidental mutation by subscribers. Create a defensive copy of the map (and of any slice values you expect) before sending it in ConversationMessage.
| // Get accumulated metadata from job (e.g., images, files generated by previous actions in this job) | ||
| // This is per-job metadata, so parallel jobs won't interfere with each other | ||
| metadata := job.Metadata | ||
|
|
||
| go func(agent *Agent) { | ||
| xlog.Info("Sending new conversation to channel", "agent", agent.Character.Name, "message", msg.Content) | ||
| agent.newConversations <- msg | ||
| xlog.Info("Sending new conversation to channel", "agent", agent.Character.Name, "message", msg.Content, "metadata_keys", len(metadata)) | ||
| // Send ConversationMessage with both the message and accumulated metadata | ||
| agent.newConversations <- types.NewConversationMessage(msg).WithMetadata(metadata) | ||
| // Job metadata is automatically cleared when job finishes, no need to manually clear |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There’s no test asserting that action metadata is propagated to WithNewConversationSubscriber / AddSubscriber consumers. Add a unit/integration test that runs a job with an action returning metadata (e.g. images_url) followed by new_conversation, and verify the subscriber receives the expected metadata map.
| if imagesUrls, exists := ccm.Metadata[actions.MetadataImages]; exists { | ||
| for _, url := range xstrings.UniqueSlice(imagesUrls.([]string)) { | ||
| xlog.Debug("Sending photo from new conversation", "url", url) | ||
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | ||
| if err := sendImageToTelegram(ctx, t.bot, chatID, url); err != nil { | ||
| xlog.Error("Error handling image", "error", err) |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this subscriber, strconv.ParseInt(t.channelID, 10, 64) errors are ignored; if channelID is non-numeric (e.g. @channelusername) or malformed, chatID becomes 0 and media will be sent to an invalid chat. Parse the chat ID once before the loops, handle the error (log + return), and reuse the parsed value for all sends.
| if imagesUrls, exists := ccm.Metadata[actions.MetadataImages]; exists { | ||
| for _, url := range xstrings.UniqueSlice(imagesUrls.([]string)) { | ||
| xlog.Debug("Sending photo from new conversation", "url", url) | ||
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | ||
| if err := sendImageToTelegram(ctx, t.bot, chatID, url); err != nil { | ||
| xlog.Error("Error handling image", "error", err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Handle songs from generate_song action (local file paths) | ||
| if songPaths, exists := ccm.Metadata[actions.MetadataSongs]; exists { | ||
| for _, path := range xstrings.UniqueSlice(songPaths.([]string)) { | ||
| xlog.Debug("Sending song from new conversation", "path", path) |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metadata values are asserted as ([]string) without checking the dynamic type (e.g. imagesUrls.([]string)). If any action (or user-defined action) populates these metadata keys with a different type (like []any), this will panic. Use a safe type assertion/type switch and skip/log unexpected types.
| // Handle images from gen image actions | ||
| if imagesUrls, exists := ccm.Metadata[actions.MetadataImages]; exists { | ||
| for _, url := range xstrings.UniqueSlice(imagesUrls.([]string)) { | ||
| xlog.Debug("Sending photo from new conversation", "url", url) | ||
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | ||
| if err := sendImageToTelegram(ctx, t.bot, chatID, url); err != nil { | ||
| xlog.Error("Error handling image", "error", err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Handle songs from generate_song action (local file paths) | ||
| if songPaths, exists := ccm.Metadata[actions.MetadataSongs]; exists { | ||
| for _, path := range xstrings.UniqueSlice(songPaths.([]string)) { | ||
| xlog.Debug("Sending song from new conversation", "path", path) | ||
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | ||
| if err := sendSongToTelegram(ctx, t.bot, chatID, path); err != nil { | ||
| xlog.Error("Error sending song", "error", err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Handle PDFs from generate_pdf action (local file paths) | ||
| if pdfPaths, exists := ccm.Metadata[actions.MetadataPDFs]; exists { | ||
| for _, path := range xstrings.UniqueSlice(pdfPaths.([]string)) { | ||
| data, err := os.ReadFile(path) | ||
| if err != nil { | ||
| xlog.Error("Error reading PDF file", "path", path, "error", err) | ||
| continue | ||
| } | ||
|
|
||
| filename := filepath.Base(path) | ||
| if filename == "" || filename == "." { | ||
| filename = "document.pdf" | ||
| } | ||
|
|
||
| xlog.Debug("Sending PDF document from new conversation", "filename", filename, "size", len(data)) | ||
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | ||
| _, err = t.bot.SendDocument(ctx, &bot.SendDocumentParams{ | ||
| ChatID: chatID, | ||
| Document: &models.InputFileUpload{ | ||
| Filename: filename, | ||
| Data: bytes.NewReader(data), | ||
| }, | ||
| Caption: "Generated PDF", | ||
| }) | ||
| if err != nil { | ||
| xlog.Error("Error sending PDF", "error", err) | ||
| } | ||
| } | ||
| } |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block duplicates the multimedia handling already implemented in handleMultimediaContent (same images/songs/pdfs logic). Consider extracting a shared helper for “send metadata attachments” and reuse it here to avoid divergent behavior/bugs when one path is updated and the other isn’t.
| // Handle images from gen image actions | |
| if imagesUrls, exists := ccm.Metadata[actions.MetadataImages]; exists { | |
| for _, url := range xstrings.UniqueSlice(imagesUrls.([]string)) { | |
| xlog.Debug("Sending photo from new conversation", "url", url) | |
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | |
| if err := sendImageToTelegram(ctx, t.bot, chatID, url); err != nil { | |
| xlog.Error("Error handling image", "error", err) | |
| } | |
| } | |
| } | |
| // Handle songs from generate_song action (local file paths) | |
| if songPaths, exists := ccm.Metadata[actions.MetadataSongs]; exists { | |
| for _, path := range xstrings.UniqueSlice(songPaths.([]string)) { | |
| xlog.Debug("Sending song from new conversation", "path", path) | |
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | |
| if err := sendSongToTelegram(ctx, t.bot, chatID, path); err != nil { | |
| xlog.Error("Error sending song", "error", err) | |
| } | |
| } | |
| } | |
| // Handle PDFs from generate_pdf action (local file paths) | |
| if pdfPaths, exists := ccm.Metadata[actions.MetadataPDFs]; exists { | |
| for _, path := range xstrings.UniqueSlice(pdfPaths.([]string)) { | |
| data, err := os.ReadFile(path) | |
| if err != nil { | |
| xlog.Error("Error reading PDF file", "path", path, "error", err) | |
| continue | |
| } | |
| filename := filepath.Base(path) | |
| if filename == "" || filename == "." { | |
| filename = "document.pdf" | |
| } | |
| xlog.Debug("Sending PDF document from new conversation", "filename", filename, "size", len(data)) | |
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | |
| _, err = t.bot.SendDocument(ctx, &bot.SendDocumentParams{ | |
| ChatID: chatID, | |
| Document: &models.InputFileUpload{ | |
| Filename: filename, | |
| Data: bytes.NewReader(data), | |
| }, | |
| Caption: "Generated PDF", | |
| }) | |
| if err != nil { | |
| xlog.Error("Error sending PDF", "error", err) | |
| } | |
| } | |
| } | |
| chatID, _ := strconv.ParseInt(t.channelID, 10, 64) | |
| if err := handleMultimediaContent(ctx, t.bot, chatID, ccm.Metadata); err != nil { | |
| xlog.Error("Error handling multimedia content", "error", err) | |
| } |
| data, err := os.ReadFile(path) | ||
| if err != nil { | ||
| xlog.Error("Error reading PDF file", "path", path, "error", err) | ||
| continue | ||
| } |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading PDFs with os.ReadFile(path) loads the entire document into memory. Since these files can be user-generated and potentially large, add a size check (and/or stream upload if supported by the Telegram client) to avoid high memory usage or OOMs when sending PDFs from metadata.
| case msg := <-a.newConversations: | ||
| xlog.Debug("New conversation", "agent", a.Character.Name, "message", msg.Content) | ||
| xlog.Debug("New conversation", "agent", a.Character.Name, "message", msg.Message.Content) | ||
| a.subscriberMutex.Lock() |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg is dereferenced in the debug log (msg.Message.Content) before checking msg != nil, so a nil message sent on a.newConversations would panic. Move the nil check before any dereference/logging in this select case.
| job.Metadata = make(map[string]interface{}) | ||
| } | ||
| for key, value := range actionResult.Metadata { | ||
| job.Metadata[key] = value |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The “merge” of actionResult.Metadata into job.Metadata overwrites existing keys, so multiple actions producing the same metadata key (e.g. multiple images_url results) will lose earlier values. If the intent is to accumulate attachments across actions, append/union slices for known keys (or implement a generic merge strategy) instead of unconditional overwrite.
| job.Metadata[key] = value | |
| // If the key does not exist yet, just set it. | |
| if existing, ok := job.Metadata[key]; ok { | |
| switch v := value.(type) { | |
| // Append []string values when both old and new are []string. | |
| case []string: | |
| if exSlice, ok := existing.([]string); ok { | |
| job.Metadata[key] = append(exSlice, v...) | |
| continue | |
| } | |
| // Append []interface{} values when both old and new are []interface{}. | |
| case []interface{}: | |
| if exSlice, ok := existing.([]interface{}); ok { | |
| job.Metadata[key] = append(exSlice, v...) | |
| continue | |
| } | |
| } | |
| // Fallback: overwrite (preserves previous behavior for non-slice types). | |
| job.Metadata[key] = value | |
| } else { | |
| job.Metadata[key] = value | |
| } |
No description provided.