mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-15 13:19:24 +02:00
* fix(bluebubbles): restore inbound image attachments and accept updated-message events Four interconnected fixes for BlueBubbles inbound media: 1. Strip bundled-undici dispatcher from non-SSRF fetch path so attachment downloads no longer silently fail on Node 22+ (#64105, #61861) 2. Accept updated-message webhook events that carry attachments instead of filtering them as non-reaction events (#65430) 3. Include eventType in the persistent GUID dedup key so updated-message follow-ups are not rejected as duplicates of the original new-message (#52277) 4. Retry attachment fetch from BB API (2s delay) when the initial webhook arrives with an empty attachments array — image-only messages and updated-message events only (#67437) Closes #64105, closes #61861, closes #65430. * fix(bluebubbles): resolve review findings — SSRF policy, reuse extractAttachments, add tests - F1 (BLOCKER): pass undefined instead of {} for SSRF policy when allowPrivateNetwork is false, so localhost BB servers are not blocked. - F2 (IMPORTANT): reuse exported extractAttachments() from monitor-normalize instead of duplicating field extraction logic. - F3 (IMPORTANT): simplify asRecord(asRecord(payload)?.data) to asRecord(payload.data) since payload is already Record<string, unknown>. - F4 (NIT): bind retryMessageId before the guard to eliminate non-null assertion. - F5 (IMPORTANT): add 4 tests for fetchBlueBubblesMessageAttachments covering success, non-ok HTTP, empty data, and guid-less entries. - Add CHANGELOG entry for the user-facing fix. * fix(ci): update raw-fetch allowlist line number after dispatcher strip * fix(bluebubbles): resolve PR review findings (#67510) - monitor-processing: move attachment retry into the !rawBody guard so image-only new-message events that arrive with empty attachments and empty text are recovered via a BB API refetch before being dropped. The existing retry block at the end of processMessageAfterDedupe was unreachable for this case because the !rawBody early-return fired first. (Greptile) - monitor: derive isAttachmentUpdate from the normalized message shape instead of raw payload.data.attachments so updated-message webhooks with attachments under wrapper formats (payload.message, JSON-string payloads) are correctly routed through for processing instead of silently filtered. (Codex) - types: use bundled-undici fetch when init.dispatcher is present so the SSRF guard's DNS-pinning dispatcher is preserved when this function is called as fetchImpl from guarded callers (e.g. the attachment download path via fetchRemoteMedia). Falls back to globalThis.fetch when no dispatcher is present so tests that stub globalThis.fetch keep working. (Codex) - attachments: blueBubblesPolicy returns undefined for the non-private case (matching monitor-processing's helper) so sendBlueBubblesAttachment stops routing localhost BB through the SSRF guard. (Greptile) - scripts/check-no-raw-channel-fetch: bump the types.ts allowlist line to match the restructured non-SSRF branch. * fix(bluebubbles): move attachment retry before rawBody guard, fix stale log Move the attachment retry block (2s BB API refetch for empty attachments) before the !rawBody early-return guard. Previously, image-only messages with text='' and attachments=[] would be dropped by the !rawBody check before the retry could fire, making fix #4 dead code for its primary use-case. Now the retry runs first and recomputes the placeholder from resolved attachments so rawBody becomes non-empty when media is found. Also fix stale log message that still said 'without reaction' after the filter was expanded to pass through attachment updates. * fix(bluebubbles): revert undici import, restore dispatcher-strip approach Revert the @claude bot's undici import in types.ts — it introduced a direct 'undici' dependency that is not declared in the BB extension's package.json and would break isolated plugin installs. Restore the original dispatcher-strip approach which is correct: the SSRF guard already completed validation upstream before calling this function as fetchImpl, so stripping the dispatcher does not weaken security. * fix(bluebubbles): remove dead empty-body recovery block in !rawBody guard The empty-body attachment-recovery block added in the earlier PR revision is now redundant because the main retry block was moved above the rawBody computation in0d7d1c4208. Worse, that leftover block reassigned the (now-const) placeholder variable, throwing `TypeError: Assignment to constant variable` at runtime for image-only messages — breaking the very recovery path it was meant to protect (flagged by Codex on 4bfc2777). Remove the dead block; the up-front retry already handles the image-only case by recovering attachments before the rawBody computation, so once we reach the !rawBody guard with an empty body it is genuinely empty and should drop as before. * fix(ci): update raw-fetch allowlist line after dispatcher-strip revert279dba17d2reverted types.ts back to the dispatcher-strip approach, which put the `fetch(url, ...)` call at line 189 instead of line 198. Bump the allowlist entry to match so `lint:tmp:no-raw-channel-fetch` stops failing check-additional. * test(pdf-tool): update stale opus-4-6 constant to opus-4-7 `628b454eff feat: default Anthropic to Opus 4.7` bumped the bundled anthropic image default to `claude-opus-4-7` but missed updating the `ANTHROPIC_PDF_MODEL` constant in pdf-tool.model-config.test.ts. The tests now fail on any PR that runs the `checks-node-agentic-agents-plugins` shard because the resolver returns 4-7 while the test asserts 4-6. Bump the constant to 4-7 to match the bundled default. --------- Co-authored-by: Lobster <10343873+omarshahine@users.noreply.github.com>
1897 lines
64 KiB
TypeScript
1897 lines
64 KiB
TypeScript
import {
|
|
resolveOutboundMediaUrls,
|
|
resolveTextChunksWithFallback,
|
|
sendMediaWithLeadingCaption,
|
|
} from "openclaw/plugin-sdk/reply-payload";
|
|
import { isPrivateNetworkOptInEnabled } from "openclaw/plugin-sdk/ssrf-runtime";
|
|
import {
|
|
normalizeLowercaseStringOrEmpty,
|
|
normalizeOptionalLowercaseString,
|
|
normalizeOptionalString,
|
|
} from "openclaw/plugin-sdk/text-runtime";
|
|
import {
|
|
downloadBlueBubblesAttachment,
|
|
fetchBlueBubblesMessageAttachments,
|
|
} from "./attachments.js";
|
|
import { markBlueBubblesChatRead, sendBlueBubblesTyping } from "./chat.js";
|
|
import { resolveBlueBubblesConversationRoute } from "./conversation-route.js";
|
|
import { fetchBlueBubblesHistory } from "./history.js";
|
|
import {
|
|
claimBlueBubblesInboundMessage,
|
|
resolveBlueBubblesInboundDedupeKey,
|
|
} from "./inbound-dedupe.js";
|
|
import { sendBlueBubblesMedia } from "./media-send.js";
|
|
import {
|
|
buildMessagePlaceholder,
|
|
formatGroupAllowlistEntry,
|
|
formatGroupMembers,
|
|
formatReplyTag,
|
|
normalizeParticipantList,
|
|
parseTapbackText,
|
|
resolveGroupFlagFromChatGuid,
|
|
resolveTapbackContext,
|
|
type NormalizedWebhookMessage,
|
|
type NormalizedWebhookReaction,
|
|
} from "./monitor-normalize.js";
|
|
import {
|
|
DM_GROUP_ACCESS_REASON,
|
|
createChannelPairingController,
|
|
createChannelReplyPipeline,
|
|
evictOldHistoryKeys,
|
|
evaluateSupplementalContextVisibility,
|
|
logAckFailure,
|
|
logInboundDrop,
|
|
logTypingFailure,
|
|
mapAllowFromEntries,
|
|
readStoreAllowFromForDmPolicy,
|
|
recordPendingHistoryEntryIfEnabled,
|
|
resolveAckReaction,
|
|
resolveChannelContextVisibilityMode,
|
|
resolveDmGroupAccessWithLists,
|
|
resolveControlCommandGate,
|
|
stripMarkdown,
|
|
type HistoryEntry,
|
|
} from "./monitor-processing-api.js";
|
|
import {
|
|
getShortIdForUuid,
|
|
rememberBlueBubblesReplyCache,
|
|
resolveBlueBubblesMessageId,
|
|
resolveReplyContextFromCache,
|
|
} from "./monitor-reply-cache.js";
|
|
import {
|
|
hasBlueBubblesSelfChatCopy,
|
|
rememberBlueBubblesSelfChatCopy,
|
|
} from "./monitor-self-chat-cache.js";
|
|
import type {
|
|
BlueBubblesCoreRuntime,
|
|
BlueBubblesRuntimeEnv,
|
|
WebhookTarget,
|
|
} from "./monitor-shared.js";
|
|
import { enrichBlueBubblesParticipantsWithContactNames } from "./participant-contact-names.js";
|
|
import { isBlueBubblesPrivateApiEnabled } from "./probe.js";
|
|
import { normalizeBlueBubblesReactionInput, sendBlueBubblesReaction } from "./reactions.js";
|
|
import type { OpenClawConfig } from "./runtime-api.js";
|
|
import { normalizeSecretInputString } from "./secret-input.js";
|
|
import { resolveChatGuidForTarget, sendMessageBlueBubbles } from "./send.js";
|
|
import {
|
|
extractHandleFromChatGuid,
|
|
formatBlueBubblesChatTarget,
|
|
isAllowedBlueBubblesSender,
|
|
normalizeBlueBubblesHandle,
|
|
} from "./targets.js";
|
|
import { blueBubblesFetchWithTimeout, buildBlueBubblesApiUrl } from "./types.js";
|
|
|
|
const DEFAULT_TEXT_LIMIT = 4000;
|
|
const invalidAckReactions = new Set<string>();
|
|
const REPLY_DIRECTIVE_TAG_RE = /\[\[\s*(?:reply_to_current|reply_to\s*:\s*[^\]\n]+)\s*\]\]/gi;
|
|
const PENDING_OUTBOUND_MESSAGE_ID_TTL_MS = 2 * 60 * 1000;
|
|
|
|
type PendingOutboundMessageId = {
|
|
id: number;
|
|
accountId: string;
|
|
sessionKey: string;
|
|
outboundTarget: string;
|
|
chatGuid?: string;
|
|
chatIdentifier?: string;
|
|
chatId?: number;
|
|
snippetRaw: string;
|
|
snippetNorm: string;
|
|
isMediaSnippet: boolean;
|
|
createdAt: number;
|
|
};
|
|
|
|
const pendingOutboundMessageIds: PendingOutboundMessageId[] = [];
|
|
let pendingOutboundMessageIdCounter = 0;
|
|
|
|
function normalizeSnippet(value: string): string {
|
|
return normalizeOptionalLowercaseString(stripMarkdown(value).replace(/\s+/g, " ")) ?? "";
|
|
}
|
|
|
|
type BlueBubblesChatRecord = Record<string, unknown>;
|
|
|
|
function blueBubblesPolicy(allowPrivateNetwork: boolean | undefined) {
|
|
return allowPrivateNetwork ? { allowPrivateNetwork: true } : undefined;
|
|
}
|
|
|
|
function extractBlueBubblesChatGuid(chat: BlueBubblesChatRecord): string | undefined {
|
|
const candidates = [chat.chatGuid, chat.guid, chat.chat_guid];
|
|
for (const candidate of candidates) {
|
|
if (typeof candidate === "string" && candidate.trim()) {
|
|
return candidate.trim();
|
|
}
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
function extractBlueBubblesChatId(chat: BlueBubblesChatRecord): number | undefined {
|
|
const candidates = [chat.chatId, chat.id, chat.chat_id];
|
|
for (const candidate of candidates) {
|
|
if (typeof candidate === "number" && Number.isFinite(candidate)) {
|
|
return candidate;
|
|
}
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
function extractChatIdentifierFromChatGuid(chatGuid: string): string | undefined {
|
|
const parts = chatGuid.split(";");
|
|
if (parts.length < 3) {
|
|
return undefined;
|
|
}
|
|
const identifier = parts[2]?.trim();
|
|
return identifier || undefined;
|
|
}
|
|
|
|
function extractBlueBubblesChatIdentifier(chat: BlueBubblesChatRecord): string | undefined {
|
|
const candidates = [chat.chatIdentifier, chat.chat_identifier, chat.identifier];
|
|
for (const candidate of candidates) {
|
|
if (typeof candidate === "string" && candidate.trim()) {
|
|
return candidate.trim();
|
|
}
|
|
}
|
|
const chatGuid = extractBlueBubblesChatGuid(chat);
|
|
return chatGuid ? extractChatIdentifierFromChatGuid(chatGuid) : undefined;
|
|
}
|
|
|
|
async function queryBlueBubblesChats(params: {
|
|
baseUrl: string;
|
|
password: string;
|
|
timeoutMs?: number;
|
|
offset: number;
|
|
limit: number;
|
|
allowPrivateNetwork?: boolean;
|
|
}): Promise<BlueBubblesChatRecord[]> {
|
|
const url = buildBlueBubblesApiUrl({
|
|
baseUrl: params.baseUrl,
|
|
path: "/api/v1/chat/query",
|
|
password: params.password,
|
|
});
|
|
const res = await blueBubblesFetchWithTimeout(
|
|
url,
|
|
{
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({
|
|
limit: params.limit,
|
|
offset: params.offset,
|
|
with: ["participants"],
|
|
}),
|
|
},
|
|
params.timeoutMs,
|
|
blueBubblesPolicy(params.allowPrivateNetwork),
|
|
);
|
|
if (!res.ok) {
|
|
return [];
|
|
}
|
|
const payload = (await res.json().catch(() => null)) as Record<string, unknown> | null;
|
|
const data = payload && typeof payload.data !== "undefined" ? (payload.data as unknown) : null;
|
|
return Array.isArray(data) ? (data as BlueBubblesChatRecord[]) : [];
|
|
}
|
|
|
|
async function fetchBlueBubblesParticipantsForInboundMessage(params: {
|
|
baseUrl: string;
|
|
password: string;
|
|
chatGuid?: string;
|
|
chatId?: number;
|
|
chatIdentifier?: string;
|
|
allowPrivateNetwork?: boolean;
|
|
}): Promise<import("./monitor-normalize.js").BlueBubblesParticipant[] | null> {
|
|
if (!params.chatGuid && params.chatId == null && !params.chatIdentifier) {
|
|
return null;
|
|
}
|
|
|
|
const limit = 500;
|
|
for (let offset = 0; offset < 5000; offset += limit) {
|
|
const chats = await queryBlueBubblesChats({
|
|
baseUrl: params.baseUrl,
|
|
password: params.password,
|
|
offset,
|
|
limit,
|
|
allowPrivateNetwork: params.allowPrivateNetwork,
|
|
});
|
|
if (chats.length === 0) {
|
|
return null;
|
|
}
|
|
|
|
for (const chat of chats) {
|
|
const chatGuid = extractBlueBubblesChatGuid(chat);
|
|
const chatId = extractBlueBubblesChatId(chat);
|
|
const chatIdentifier = extractBlueBubblesChatIdentifier(chat);
|
|
const matches =
|
|
(params.chatGuid && chatGuid === params.chatGuid) ||
|
|
(params.chatId != null && chatId === params.chatId) ||
|
|
(params.chatIdentifier &&
|
|
(chatIdentifier === params.chatIdentifier || chatGuid === params.chatIdentifier));
|
|
if (matches) {
|
|
return normalizeParticipantList(chat);
|
|
}
|
|
}
|
|
|
|
if (chats.length < limit) {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
function isBlueBubblesSelfChatMessage(
|
|
message: NormalizedWebhookMessage,
|
|
isGroup: boolean,
|
|
): boolean {
|
|
if (isGroup || !message.senderIdExplicit) {
|
|
return false;
|
|
}
|
|
const chatHandle =
|
|
(message.chatGuid ? extractHandleFromChatGuid(message.chatGuid) : null) ??
|
|
normalizeBlueBubblesHandle(message.chatIdentifier ?? "");
|
|
return Boolean(chatHandle) && chatHandle === message.senderId;
|
|
}
|
|
|
|
function prunePendingOutboundMessageIds(now = Date.now()): void {
|
|
const cutoff = now - PENDING_OUTBOUND_MESSAGE_ID_TTL_MS;
|
|
for (let i = pendingOutboundMessageIds.length - 1; i >= 0; i--) {
|
|
if (pendingOutboundMessageIds[i].createdAt < cutoff) {
|
|
pendingOutboundMessageIds.splice(i, 1);
|
|
}
|
|
}
|
|
}
|
|
|
|
function rememberPendingOutboundMessageId(entry: {
|
|
accountId: string;
|
|
sessionKey: string;
|
|
outboundTarget: string;
|
|
chatGuid?: string;
|
|
chatIdentifier?: string;
|
|
chatId?: number;
|
|
snippet: string;
|
|
}): number {
|
|
prunePendingOutboundMessageIds();
|
|
pendingOutboundMessageIdCounter += 1;
|
|
const snippetRaw = entry.snippet.trim();
|
|
const snippetNorm = normalizeSnippet(snippetRaw);
|
|
pendingOutboundMessageIds.push({
|
|
id: pendingOutboundMessageIdCounter,
|
|
accountId: entry.accountId,
|
|
sessionKey: entry.sessionKey,
|
|
outboundTarget: entry.outboundTarget,
|
|
chatGuid: normalizeOptionalString(entry.chatGuid),
|
|
chatIdentifier: normalizeOptionalString(entry.chatIdentifier),
|
|
chatId: typeof entry.chatId === "number" ? entry.chatId : undefined,
|
|
snippetRaw,
|
|
snippetNorm,
|
|
isMediaSnippet: normalizeLowercaseStringOrEmpty(snippetRaw).startsWith("<media:"),
|
|
createdAt: Date.now(),
|
|
});
|
|
return pendingOutboundMessageIdCounter;
|
|
}
|
|
|
|
function forgetPendingOutboundMessageId(id: number): void {
|
|
const index = pendingOutboundMessageIds.findIndex((entry) => entry.id === id);
|
|
if (index >= 0) {
|
|
pendingOutboundMessageIds.splice(index, 1);
|
|
}
|
|
}
|
|
|
|
function chatsMatch(
|
|
left: Pick<PendingOutboundMessageId, "chatGuid" | "chatIdentifier" | "chatId">,
|
|
right: { chatGuid?: string; chatIdentifier?: string; chatId?: number },
|
|
): boolean {
|
|
const leftGuid = normalizeOptionalString(left.chatGuid);
|
|
const rightGuid = normalizeOptionalString(right.chatGuid);
|
|
if (leftGuid && rightGuid) {
|
|
return leftGuid === rightGuid;
|
|
}
|
|
|
|
const leftIdentifier = normalizeOptionalString(left.chatIdentifier);
|
|
const rightIdentifier = normalizeOptionalString(right.chatIdentifier);
|
|
if (leftIdentifier && rightIdentifier) {
|
|
return leftIdentifier === rightIdentifier;
|
|
}
|
|
|
|
const leftChatId = typeof left.chatId === "number" ? left.chatId : undefined;
|
|
const rightChatId = typeof right.chatId === "number" ? right.chatId : undefined;
|
|
if (leftChatId !== undefined && rightChatId !== undefined) {
|
|
return leftChatId === rightChatId;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
function consumePendingOutboundMessageId(params: {
|
|
accountId: string;
|
|
chatGuid?: string;
|
|
chatIdentifier?: string;
|
|
chatId?: number;
|
|
body: string;
|
|
}): PendingOutboundMessageId | null {
|
|
prunePendingOutboundMessageIds();
|
|
const bodyNorm = normalizeSnippet(params.body);
|
|
const isMediaBody = normalizeLowercaseStringOrEmpty(params.body).startsWith("<media:");
|
|
|
|
for (let i = 0; i < pendingOutboundMessageIds.length; i++) {
|
|
const entry = pendingOutboundMessageIds[i];
|
|
if (entry.accountId !== params.accountId) {
|
|
continue;
|
|
}
|
|
if (!chatsMatch(entry, params)) {
|
|
continue;
|
|
}
|
|
if (entry.snippetNorm && entry.snippetNorm === bodyNorm) {
|
|
pendingOutboundMessageIds.splice(i, 1);
|
|
return entry;
|
|
}
|
|
if (entry.isMediaSnippet && isMediaBody) {
|
|
pendingOutboundMessageIds.splice(i, 1);
|
|
return entry;
|
|
}
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
export function logVerbose(
|
|
core: BlueBubblesCoreRuntime,
|
|
runtime: BlueBubblesRuntimeEnv,
|
|
message: string,
|
|
): void {
|
|
if (core.logging.shouldLogVerbose()) {
|
|
runtime.log?.(`[bluebubbles] ${message}`);
|
|
}
|
|
}
|
|
|
|
function logGroupAllowlistHint(params: {
|
|
runtime: BlueBubblesRuntimeEnv;
|
|
reason: string;
|
|
entry: string | null;
|
|
chatName?: string;
|
|
accountId?: string;
|
|
}): void {
|
|
const log = params.runtime.log ?? console.log;
|
|
const nameHint = params.chatName ? ` (group name: ${params.chatName})` : "";
|
|
const accountHint = params.accountId
|
|
? ` (or channels.bluebubbles.accounts.${params.accountId}.groupAllowFrom)`
|
|
: "";
|
|
if (params.entry) {
|
|
log(
|
|
`[bluebubbles] group message blocked (${params.reason}). Allow this group by adding ` +
|
|
`"${params.entry}" to channels.bluebubbles.groupAllowFrom${nameHint}.`,
|
|
);
|
|
log(
|
|
`[bluebubbles] add to config: channels.bluebubbles.groupAllowFrom=["${params.entry}"]${accountHint}.`,
|
|
);
|
|
return;
|
|
}
|
|
log(
|
|
`[bluebubbles] group message blocked (${params.reason}). Allow groups by setting ` +
|
|
`channels.bluebubbles.groupPolicy="open" or adding a group id to ` +
|
|
`channels.bluebubbles.groupAllowFrom${accountHint}${nameHint}.`,
|
|
);
|
|
}
|
|
|
|
function resolveBlueBubblesAckReaction(params: {
|
|
cfg: OpenClawConfig;
|
|
agentId: string;
|
|
core: BlueBubblesCoreRuntime;
|
|
runtime: BlueBubblesRuntimeEnv;
|
|
}): string | null {
|
|
const raw = resolveAckReaction(params.cfg, params.agentId).trim();
|
|
if (!raw) {
|
|
return null;
|
|
}
|
|
try {
|
|
normalizeBlueBubblesReactionInput(raw);
|
|
return raw;
|
|
} catch {
|
|
const key = normalizeLowercaseStringOrEmpty(raw);
|
|
if (!invalidAckReactions.has(key)) {
|
|
invalidAckReactions.add(key);
|
|
logVerbose(
|
|
params.core,
|
|
params.runtime,
|
|
`ack reaction skipped (unsupported for BlueBubbles): ${raw}`,
|
|
);
|
|
}
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* In-memory rolling history map keyed by account + chat identifier.
|
|
* Populated from incoming messages during the session.
|
|
* API backfill is attempted until one fetch resolves (or retries are exhausted).
|
|
*/
|
|
const chatHistories = new Map<string, HistoryEntry[]>();
|
|
type HistoryBackfillState = {
|
|
attempts: number;
|
|
firstAttemptAt: number;
|
|
nextAttemptAt: number;
|
|
resolved: boolean;
|
|
};
|
|
|
|
const historyBackfills = new Map<string, HistoryBackfillState>();
|
|
const HISTORY_BACKFILL_BASE_DELAY_MS = 5_000;
|
|
const HISTORY_BACKFILL_MAX_DELAY_MS = 2 * 60 * 1000;
|
|
const HISTORY_BACKFILL_MAX_ATTEMPTS = 6;
|
|
const HISTORY_BACKFILL_RETRY_WINDOW_MS = 30 * 60 * 1000;
|
|
const MAX_STORED_HISTORY_ENTRY_CHARS = 2_000;
|
|
const MAX_INBOUND_HISTORY_ENTRY_CHARS = 1_200;
|
|
const MAX_INBOUND_HISTORY_TOTAL_CHARS = 12_000;
|
|
|
|
function buildAccountScopedHistoryKey(accountId: string, historyIdentifier: string): string {
|
|
return `${accountId}\u0000${historyIdentifier}`;
|
|
}
|
|
|
|
function historyDedupKey(entry: HistoryEntry): string {
|
|
const messageId = entry.messageId?.trim();
|
|
if (messageId) {
|
|
return `id:${messageId}`;
|
|
}
|
|
return `fallback:${entry.sender}\u0000${entry.body}\u0000${entry.timestamp ?? ""}`;
|
|
}
|
|
|
|
function truncateHistoryBody(body: string, maxChars: number): string {
|
|
const trimmed = body.trim();
|
|
if (!trimmed) {
|
|
return "";
|
|
}
|
|
if (trimmed.length <= maxChars) {
|
|
return trimmed;
|
|
}
|
|
return `${trimmed.slice(0, maxChars).trimEnd()}...`;
|
|
}
|
|
|
|
function mergeHistoryEntries(params: {
|
|
apiEntries: HistoryEntry[];
|
|
currentEntries: HistoryEntry[];
|
|
limit: number;
|
|
}): HistoryEntry[] {
|
|
if (params.limit <= 0) {
|
|
return [];
|
|
}
|
|
|
|
const merged: HistoryEntry[] = [];
|
|
const seen = new Set<string>();
|
|
const appendUnique = (entry: HistoryEntry) => {
|
|
const key = historyDedupKey(entry);
|
|
if (seen.has(key)) {
|
|
return;
|
|
}
|
|
seen.add(key);
|
|
merged.push(entry);
|
|
};
|
|
|
|
for (const entry of params.apiEntries) {
|
|
appendUnique(entry);
|
|
}
|
|
for (const entry of params.currentEntries) {
|
|
appendUnique(entry);
|
|
}
|
|
|
|
if (merged.length <= params.limit) {
|
|
return merged;
|
|
}
|
|
return merged.slice(merged.length - params.limit);
|
|
}
|
|
|
|
function pruneHistoryBackfillState(): void {
|
|
for (const key of historyBackfills.keys()) {
|
|
if (!chatHistories.has(key)) {
|
|
historyBackfills.delete(key);
|
|
}
|
|
}
|
|
}
|
|
|
|
function markHistoryBackfillResolved(historyKey: string): void {
|
|
const state = historyBackfills.get(historyKey);
|
|
if (state) {
|
|
state.resolved = true;
|
|
historyBackfills.set(historyKey, state);
|
|
return;
|
|
}
|
|
historyBackfills.set(historyKey, {
|
|
attempts: 0,
|
|
firstAttemptAt: Date.now(),
|
|
nextAttemptAt: Number.POSITIVE_INFINITY,
|
|
resolved: true,
|
|
});
|
|
}
|
|
|
|
function planHistoryBackfillAttempt(historyKey: string, now: number): HistoryBackfillState | null {
|
|
const existing = historyBackfills.get(historyKey);
|
|
if (existing?.resolved) {
|
|
return null;
|
|
}
|
|
if (existing && now - existing.firstAttemptAt > HISTORY_BACKFILL_RETRY_WINDOW_MS) {
|
|
markHistoryBackfillResolved(historyKey);
|
|
return null;
|
|
}
|
|
if (existing && existing.attempts >= HISTORY_BACKFILL_MAX_ATTEMPTS) {
|
|
markHistoryBackfillResolved(historyKey);
|
|
return null;
|
|
}
|
|
if (existing && now < existing.nextAttemptAt) {
|
|
return null;
|
|
}
|
|
|
|
const attempts = (existing?.attempts ?? 0) + 1;
|
|
const firstAttemptAt = existing?.firstAttemptAt ?? now;
|
|
const backoffDelay = Math.min(
|
|
HISTORY_BACKFILL_BASE_DELAY_MS * 2 ** (attempts - 1),
|
|
HISTORY_BACKFILL_MAX_DELAY_MS,
|
|
);
|
|
const state: HistoryBackfillState = {
|
|
attempts,
|
|
firstAttemptAt,
|
|
nextAttemptAt: now + backoffDelay,
|
|
resolved: false,
|
|
};
|
|
historyBackfills.set(historyKey, state);
|
|
return state;
|
|
}
|
|
|
|
function buildInboundHistorySnapshot(params: {
|
|
entries: HistoryEntry[];
|
|
limit: number;
|
|
}): Array<{ sender: string; body: string; timestamp?: number }> | undefined {
|
|
if (params.limit <= 0 || params.entries.length === 0) {
|
|
return undefined;
|
|
}
|
|
const recent = params.entries.slice(-params.limit);
|
|
const selected: Array<{ sender: string; body: string; timestamp?: number }> = [];
|
|
let remainingChars = MAX_INBOUND_HISTORY_TOTAL_CHARS;
|
|
|
|
for (let i = recent.length - 1; i >= 0; i--) {
|
|
const entry = recent[i];
|
|
const body = truncateHistoryBody(entry.body, MAX_INBOUND_HISTORY_ENTRY_CHARS);
|
|
if (!body) {
|
|
continue;
|
|
}
|
|
if (selected.length > 0 && body.length > remainingChars) {
|
|
break;
|
|
}
|
|
selected.push({
|
|
sender: entry.sender,
|
|
body,
|
|
timestamp: entry.timestamp,
|
|
});
|
|
remainingChars -= body.length;
|
|
if (remainingChars <= 0) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (selected.length === 0) {
|
|
return undefined;
|
|
}
|
|
selected.reverse();
|
|
return selected;
|
|
}
|
|
|
|
function sanitizeForLog(value: unknown, maxLen = 200): string {
|
|
const cleaned = String(value).replace(/[\r\n\t\p{C}]/gu, " ");
|
|
return cleaned.length > maxLen ? cleaned.slice(0, maxLen) + "..." : cleaned;
|
|
}
|
|
|
|
/**
|
|
* Signal object threaded through `processMessageAfterDedupe` so the outer
|
|
* wrapper can distinguish "reply delivery failed silently" from "returned
|
|
* normally after an intentional drop" (fromMe cache, pairing flow, allowlist
|
|
* block, empty text, etc.).
|
|
*
|
|
* Reply delivery errors in the BlueBubbles path surface through the
|
|
* dispatcher's `onError` callback rather than as thrown exceptions, so a
|
|
* plain try/catch cannot detect them — see review thread `rwF8` on #66230.
|
|
*/
|
|
type InboundDedupeDeliverySignal = { deliveryFailed: boolean };
|
|
|
|
/**
|
|
* Claim → process → finalize/release wrapper around the real inbound flow.
|
|
*
|
|
* Claim before doing any work so restart replays and in-flight concurrent
|
|
* redeliveries both drop cleanly. Finalize (persist the GUID) only when
|
|
* processing completed cleanly AND any reply dispatch reported success;
|
|
* release (let a later replay try again) when processing threw OR the reply
|
|
* pipeline reported a delivery failure via its onError callback.
|
|
*
|
|
* The dedupe key follows the same canonicalization rules as the debouncer
|
|
* (`monitor-debounce.ts`): balloon events (URL previews, stickers) share
|
|
* a logical identity with their originating text message via
|
|
* `associatedMessageGuid`, so balloon-first vs text-first event ordering
|
|
* cannot produce two distinct dedupe keys for the same logical message.
|
|
*/
|
|
export async function processMessage(
|
|
message: NormalizedWebhookMessage,
|
|
target: WebhookTarget,
|
|
): Promise<void> {
|
|
const { account, core, runtime } = target;
|
|
|
|
const dedupeKey = resolveBlueBubblesInboundDedupeKey(message);
|
|
|
|
// Drop BlueBubbles MessagePoller replays after server restart (#19176, #12053).
|
|
const claim = await claimBlueBubblesInboundMessage({
|
|
guid: dedupeKey,
|
|
accountId: account.accountId,
|
|
onDiskError: (error) =>
|
|
logVerbose(core, runtime, `inbound-dedupe disk error: ${sanitizeForLog(error)}`),
|
|
});
|
|
if (claim.kind === "duplicate" || claim.kind === "inflight") {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`drop: ${claim.kind} inbound key=${sanitizeForLog(dedupeKey ?? "")} sender=${sanitizeForLog(message.senderId)}`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
const signal: InboundDedupeDeliverySignal = { deliveryFailed: false };
|
|
try {
|
|
await processMessageAfterDedupe(message, target, signal);
|
|
} catch (error) {
|
|
if (claim.kind === "claimed") {
|
|
claim.release();
|
|
}
|
|
throw error;
|
|
}
|
|
if (claim.kind === "claimed") {
|
|
if (signal.deliveryFailed) {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`inbound-dedupe: releasing claim for key=${sanitizeForLog(dedupeKey ?? "")} after reply delivery failure (will retry on replay)`,
|
|
);
|
|
claim.release();
|
|
} else {
|
|
try {
|
|
await claim.finalize();
|
|
} catch (finalizeError) {
|
|
// commit() already clears inflight state in its finally block, so
|
|
// no explicit release() needed here — just log the persistence error.
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`inbound-dedupe: finalize failed for key=${sanitizeForLog(dedupeKey ?? "")}: ${sanitizeForLog(finalizeError)}`,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async function processMessageAfterDedupe(
|
|
message: NormalizedWebhookMessage,
|
|
target: WebhookTarget,
|
|
dedupeSignal: InboundDedupeDeliverySignal,
|
|
): Promise<void> {
|
|
const { account, config, runtime, core, statusSink } = target;
|
|
|
|
const pairing = createChannelPairingController({
|
|
core,
|
|
channel: "bluebubbles",
|
|
accountId: account.accountId,
|
|
});
|
|
const privateApiEnabled = isBlueBubblesPrivateApiEnabled(account.accountId);
|
|
|
|
const groupFlag = resolveGroupFlagFromChatGuid(message.chatGuid);
|
|
const isGroup = typeof groupFlag === "boolean" ? groupFlag : message.isGroup;
|
|
|
|
const text = message.text.trim();
|
|
let attachments = message.attachments ?? [];
|
|
const baseUrl = normalizeSecretInputString(account.config.serverUrl);
|
|
const password = normalizeSecretInputString(account.config.password);
|
|
|
|
// BlueBubbles may fire the webhook before attachment indexing is complete,
|
|
// so the initial `attachments` array can be empty for messages that actually
|
|
// have media. When the message text is empty (image-only) or this is an
|
|
// `updated-message` event, wait briefly and re-fetch from the BB API as a
|
|
// fallback for cases where BB doesn't send a follow-up webhook. (#65430, #67437)
|
|
// This must run before the !rawBody guard below, otherwise image-only messages
|
|
// with empty attachments are dropped before the retry can fire.
|
|
const retryMessageId = message.messageId?.trim();
|
|
const shouldRetryAttachments =
|
|
attachments.length === 0 &&
|
|
retryMessageId &&
|
|
baseUrl &&
|
|
password &&
|
|
(text.length === 0 || message.eventType === "updated-message");
|
|
if (shouldRetryAttachments) {
|
|
try {
|
|
await new Promise<void>((resolve) => setTimeout(resolve, 2_000));
|
|
const fetched = await fetchBlueBubblesMessageAttachments(retryMessageId, {
|
|
baseUrl,
|
|
password,
|
|
timeoutMs: 10_000,
|
|
allowPrivateNetwork: isPrivateNetworkOptInEnabled(account.config),
|
|
});
|
|
if (fetched.length > 0) {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`attachment retry found ${fetched.length} attachment(s) for msgId=${message.messageId}`,
|
|
);
|
|
attachments = fetched;
|
|
}
|
|
} catch (err) {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`attachment retry failed for msgId=${message.messageId}: ${String(err)}`,
|
|
);
|
|
}
|
|
}
|
|
|
|
// Recompute placeholder from resolved attachments (may have been updated by retry).
|
|
const placeholder = buildMessagePlaceholder({ ...message, attachments });
|
|
// Check if text is a tapback pattern (e.g., 'Loved "hello"') and transform to emoji format
|
|
// For tapbacks, we'll append [[reply_to:N]] at the end; for regular messages, prepend it
|
|
const tapbackContext = resolveTapbackContext(message);
|
|
const tapbackParsed = parseTapbackText({
|
|
text,
|
|
emojiHint: tapbackContext?.emojiHint,
|
|
actionHint: tapbackContext?.actionHint,
|
|
requireQuoted: !tapbackContext,
|
|
});
|
|
const isTapbackMessage = Boolean(tapbackParsed);
|
|
const rawBody = tapbackParsed
|
|
? tapbackParsed.action === "removed"
|
|
? `removed ${tapbackParsed.emoji} reaction`
|
|
: `reacted with ${tapbackParsed.emoji}`
|
|
: text || placeholder;
|
|
const isSelfChatMessage = isBlueBubblesSelfChatMessage(message, isGroup);
|
|
const selfChatLookup = {
|
|
accountId: account.accountId,
|
|
chatGuid: message.chatGuid,
|
|
chatIdentifier: message.chatIdentifier,
|
|
chatId: message.chatId,
|
|
senderId: message.senderId,
|
|
body: rawBody,
|
|
timestamp: message.timestamp,
|
|
};
|
|
|
|
const cacheMessageId = message.messageId?.trim();
|
|
const confirmedOutboundCacheEntry = cacheMessageId
|
|
? resolveReplyContextFromCache({
|
|
accountId: account.accountId,
|
|
replyToId: cacheMessageId,
|
|
chatGuid: message.chatGuid,
|
|
chatIdentifier: message.chatIdentifier,
|
|
chatId: message.chatId,
|
|
})
|
|
: null;
|
|
let messageShortId: string | undefined;
|
|
const cacheInboundMessage = () => {
|
|
if (!cacheMessageId) {
|
|
return;
|
|
}
|
|
const cacheEntry = rememberBlueBubblesReplyCache({
|
|
accountId: account.accountId,
|
|
messageId: cacheMessageId,
|
|
chatGuid: message.chatGuid,
|
|
chatIdentifier: message.chatIdentifier,
|
|
chatId: message.chatId,
|
|
senderLabel: message.fromMe ? "me" : message.senderId,
|
|
body: rawBody,
|
|
timestamp: message.timestamp ?? Date.now(),
|
|
});
|
|
messageShortId = cacheEntry.shortId;
|
|
};
|
|
|
|
if (message.fromMe) {
|
|
// Cache from-me messages so reply context can resolve sender/body.
|
|
cacheInboundMessage();
|
|
const confirmedAssistantOutbound =
|
|
confirmedOutboundCacheEntry?.senderLabel === "me" &&
|
|
normalizeSnippet(confirmedOutboundCacheEntry.body ?? "") === normalizeSnippet(rawBody);
|
|
if (isSelfChatMessage && confirmedAssistantOutbound) {
|
|
rememberBlueBubblesSelfChatCopy(selfChatLookup);
|
|
}
|
|
if (cacheMessageId) {
|
|
const pending = consumePendingOutboundMessageId({
|
|
accountId: account.accountId,
|
|
chatGuid: message.chatGuid,
|
|
chatIdentifier: message.chatIdentifier,
|
|
chatId: message.chatId,
|
|
body: rawBody,
|
|
});
|
|
if (pending) {
|
|
const displayId = getShortIdForUuid(cacheMessageId) || cacheMessageId;
|
|
const previewSource = pending.snippetRaw || rawBody;
|
|
const preview = previewSource
|
|
? ` "${previewSource.slice(0, 12)}${previewSource.length > 12 ? "…" : ""}"`
|
|
: "";
|
|
core.system.enqueueSystemEvent(`Assistant sent${preview} [message_id:${displayId}]`, {
|
|
sessionKey: pending.sessionKey,
|
|
contextKey: `bluebubbles:outbound:${pending.outboundTarget}:${cacheMessageId}`,
|
|
});
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (isSelfChatMessage && hasBlueBubblesSelfChatCopy(selfChatLookup)) {
|
|
logVerbose(core, runtime, `drop: reflected self-chat duplicate sender=${message.senderId}`);
|
|
return;
|
|
}
|
|
|
|
if (!rawBody) {
|
|
logVerbose(core, runtime, `drop: empty text sender=${message.senderId}`);
|
|
return;
|
|
}
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`msg sender=${message.senderId} group=${isGroup} textLen=${text.length} attachments=${attachments.length} chatGuid=${message.chatGuid ?? ""} chatId=${message.chatId ?? ""}`,
|
|
);
|
|
|
|
const dmPolicy = account.config.dmPolicy ?? "pairing";
|
|
const groupPolicy = account.config.groupPolicy ?? "allowlist";
|
|
const configuredAllowFrom = mapAllowFromEntries(account.config.allowFrom);
|
|
const storeAllowFrom = await readStoreAllowFromForDmPolicy({
|
|
provider: "bluebubbles",
|
|
accountId: account.accountId,
|
|
dmPolicy,
|
|
readStore: pairing.readStoreForDmPolicy,
|
|
});
|
|
const accessDecision = resolveDmGroupAccessWithLists({
|
|
isGroup,
|
|
dmPolicy,
|
|
groupPolicy,
|
|
allowFrom: configuredAllowFrom,
|
|
groupAllowFrom: account.config.groupAllowFrom,
|
|
storeAllowFrom,
|
|
isSenderAllowed: (allowFrom) =>
|
|
isAllowedBlueBubblesSender({
|
|
allowFrom,
|
|
sender: message.senderId,
|
|
chatId: message.chatId ?? undefined,
|
|
chatGuid: message.chatGuid ?? undefined,
|
|
chatIdentifier: message.chatIdentifier ?? undefined,
|
|
}),
|
|
});
|
|
const effectiveAllowFrom = accessDecision.effectiveAllowFrom;
|
|
const effectiveGroupAllowFrom = accessDecision.effectiveGroupAllowFrom;
|
|
const groupAllowEntry = formatGroupAllowlistEntry({
|
|
chatGuid: message.chatGuid,
|
|
chatId: message.chatId ?? undefined,
|
|
chatIdentifier: message.chatIdentifier ?? undefined,
|
|
});
|
|
const groupName = normalizeOptionalString(message.chatName);
|
|
|
|
if (accessDecision.decision !== "allow") {
|
|
if (isGroup) {
|
|
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_DISABLED) {
|
|
logVerbose(core, runtime, "Blocked BlueBubbles group message (groupPolicy=disabled)");
|
|
logGroupAllowlistHint({
|
|
runtime,
|
|
reason: "groupPolicy=disabled",
|
|
entry: groupAllowEntry,
|
|
chatName: groupName,
|
|
accountId: account.accountId,
|
|
});
|
|
return;
|
|
}
|
|
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_EMPTY_ALLOWLIST) {
|
|
logVerbose(core, runtime, "Blocked BlueBubbles group message (no allowlist)");
|
|
logGroupAllowlistHint({
|
|
runtime,
|
|
reason: "groupPolicy=allowlist (empty allowlist)",
|
|
entry: groupAllowEntry,
|
|
chatName: groupName,
|
|
accountId: account.accountId,
|
|
});
|
|
return;
|
|
}
|
|
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_NOT_ALLOWLISTED) {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`Blocked BlueBubbles sender ${message.senderId} (not in groupAllowFrom)`,
|
|
);
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`drop: group sender not allowed sender=${message.senderId} allowFrom=${effectiveGroupAllowFrom.join(",")}`,
|
|
);
|
|
logGroupAllowlistHint({
|
|
runtime,
|
|
reason: "groupPolicy=allowlist (not allowlisted)",
|
|
entry: groupAllowEntry,
|
|
chatName: groupName,
|
|
accountId: account.accountId,
|
|
});
|
|
return;
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.DM_POLICY_DISABLED) {
|
|
logVerbose(core, runtime, `Blocked BlueBubbles DM from ${message.senderId}`);
|
|
logVerbose(core, runtime, `drop: dmPolicy disabled sender=${message.senderId}`);
|
|
return;
|
|
}
|
|
|
|
if (accessDecision.decision === "pairing") {
|
|
await pairing.issueChallenge({
|
|
senderId: message.senderId,
|
|
senderIdLine: `Your BlueBubbles sender id: ${message.senderId}`,
|
|
meta: { name: message.senderName },
|
|
onCreated: () => {
|
|
runtime.log?.(`[bluebubbles] pairing request sender=${message.senderId} created=true`);
|
|
logVerbose(core, runtime, `bluebubbles pairing request sender=${message.senderId}`);
|
|
},
|
|
sendPairingReply: async (text) => {
|
|
await sendMessageBlueBubbles(message.senderId, text, {
|
|
cfg: config,
|
|
accountId: account.accountId,
|
|
});
|
|
statusSink?.({ lastOutboundAt: Date.now() });
|
|
},
|
|
onReplyError: (err) => {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`bluebubbles pairing reply failed for ${message.senderId}: ${String(err)}`,
|
|
);
|
|
runtime.error?.(
|
|
`[bluebubbles] pairing reply failed sender=${message.senderId}: ${String(err)}`,
|
|
);
|
|
},
|
|
});
|
|
return;
|
|
}
|
|
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`Blocked unauthorized BlueBubbles sender ${message.senderId} (dmPolicy=${dmPolicy})`,
|
|
);
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`drop: dm sender not allowed sender=${message.senderId} allowFrom=${effectiveAllowFrom.join(",")}`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
const chatId = message.chatId ?? undefined;
|
|
const chatGuid = message.chatGuid ?? undefined;
|
|
const chatIdentifier = message.chatIdentifier ?? undefined;
|
|
const peerId = isGroup
|
|
? (chatGuid ?? chatIdentifier ?? (chatId ? String(chatId) : "group"))
|
|
: message.senderId;
|
|
|
|
const route = resolveBlueBubblesConversationRoute({
|
|
cfg: config,
|
|
accountId: account.accountId,
|
|
isGroup,
|
|
peerId,
|
|
sender: message.senderId,
|
|
chatId,
|
|
chatGuid,
|
|
chatIdentifier,
|
|
});
|
|
const contextVisibilityMode = resolveChannelContextVisibilityMode({
|
|
cfg: config,
|
|
channel: "bluebubbles",
|
|
accountId: account.accountId,
|
|
});
|
|
|
|
// Mention gating for group chats (parity with iMessage/WhatsApp)
|
|
const messageText = text;
|
|
const mentionRegexes = core.channel.mentions.buildMentionRegexes(config, route.agentId);
|
|
const wasMentioned = isGroup
|
|
? core.channel.mentions.matchesMentionPatterns(messageText, mentionRegexes)
|
|
: true;
|
|
const canDetectMention = mentionRegexes.length > 0;
|
|
const requireMention = core.channel.groups.resolveRequireMention({
|
|
cfg: config,
|
|
channel: "bluebubbles",
|
|
groupId: peerId,
|
|
accountId: account.accountId,
|
|
});
|
|
|
|
// Command gating (parity with iMessage/WhatsApp)
|
|
const useAccessGroups = config.commands?.useAccessGroups !== false;
|
|
const hasControlCmd = core.channel.text.hasControlCommand(messageText, config);
|
|
const commandDmAllowFrom = isGroup ? configuredAllowFrom : effectiveAllowFrom;
|
|
const ownerAllowedForCommands =
|
|
commandDmAllowFrom.length > 0
|
|
? isAllowedBlueBubblesSender({
|
|
allowFrom: commandDmAllowFrom,
|
|
sender: message.senderId,
|
|
chatId: message.chatId ?? undefined,
|
|
chatGuid: message.chatGuid ?? undefined,
|
|
chatIdentifier: message.chatIdentifier ?? undefined,
|
|
})
|
|
: false;
|
|
const groupAllowedForCommands =
|
|
effectiveGroupAllowFrom.length > 0
|
|
? isAllowedBlueBubblesSender({
|
|
allowFrom: effectiveGroupAllowFrom,
|
|
sender: message.senderId,
|
|
chatId: message.chatId ?? undefined,
|
|
chatGuid: message.chatGuid ?? undefined,
|
|
chatIdentifier: message.chatIdentifier ?? undefined,
|
|
})
|
|
: false;
|
|
const commandGate = resolveControlCommandGate({
|
|
useAccessGroups,
|
|
authorizers: [
|
|
{ configured: commandDmAllowFrom.length > 0, allowed: ownerAllowedForCommands },
|
|
{ configured: effectiveGroupAllowFrom.length > 0, allowed: groupAllowedForCommands },
|
|
],
|
|
allowTextCommands: true,
|
|
hasControlCommand: hasControlCmd,
|
|
});
|
|
const commandAuthorized = commandGate.commandAuthorized;
|
|
|
|
// Block control commands from unauthorized senders in groups
|
|
if (isGroup && commandGate.shouldBlock) {
|
|
logInboundDrop({
|
|
log: (msg) => logVerbose(core, runtime, msg),
|
|
channel: "bluebubbles",
|
|
reason: "control command (unauthorized)",
|
|
target: message.senderId,
|
|
});
|
|
return;
|
|
}
|
|
|
|
// Allow control commands to bypass mention gating when authorized (parity with iMessage)
|
|
const shouldBypassMention =
|
|
isGroup && requireMention && !wasMentioned && commandAuthorized && hasControlCmd;
|
|
const effectiveWasMentioned = wasMentioned || shouldBypassMention;
|
|
|
|
// Skip group messages that require mention but weren't mentioned
|
|
if (isGroup && requireMention && canDetectMention && !wasMentioned && !shouldBypassMention) {
|
|
logVerbose(core, runtime, `bluebubbles: skipping group message (no mention)`);
|
|
return;
|
|
}
|
|
|
|
if (isGroup && !message.participants?.length && baseUrl && password) {
|
|
try {
|
|
const fetchedParticipants = await fetchBlueBubblesParticipantsForInboundMessage({
|
|
baseUrl,
|
|
password,
|
|
chatGuid: message.chatGuid,
|
|
chatId: message.chatId,
|
|
chatIdentifier: message.chatIdentifier,
|
|
allowPrivateNetwork: isPrivateNetworkOptInEnabled(account.config),
|
|
});
|
|
if (fetchedParticipants?.length) {
|
|
message.participants = fetchedParticipants;
|
|
}
|
|
} catch (err) {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`bluebubbles: participant fallback lookup failed chat=${peerId}: ${String(err)}`,
|
|
);
|
|
}
|
|
}
|
|
|
|
if (
|
|
isGroup &&
|
|
account.config.enrichGroupParticipantsFromContacts === true &&
|
|
message.participants?.length
|
|
) {
|
|
// BlueBubbles only gives us participant handles, so enrich phone numbers from local Contacts
|
|
// after access, command, and mention gating have already allowed the message through.
|
|
message.participants = await enrichBlueBubblesParticipantsWithContactNames(
|
|
message.participants,
|
|
);
|
|
}
|
|
|
|
// Cache allowed inbound messages so later replies can resolve sender/body without
|
|
// surfacing dropped content (allowlist/mention/command gating).
|
|
cacheInboundMessage();
|
|
|
|
const maxBytes =
|
|
account.config.mediaMaxMb && account.config.mediaMaxMb > 0
|
|
? account.config.mediaMaxMb * 1024 * 1024
|
|
: 8 * 1024 * 1024;
|
|
|
|
let mediaUrls: string[] = [];
|
|
let mediaPaths: string[] = [];
|
|
let mediaTypes: string[] = [];
|
|
if (attachments.length > 0) {
|
|
if (!baseUrl || !password) {
|
|
logVerbose(core, runtime, "attachment download skipped (missing serverUrl/password)");
|
|
} else {
|
|
for (const attachment of attachments) {
|
|
if (!attachment.guid) {
|
|
continue;
|
|
}
|
|
if (attachment.totalBytes && attachment.totalBytes > maxBytes) {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`attachment too large guid=${attachment.guid} bytes=${attachment.totalBytes}`,
|
|
);
|
|
continue;
|
|
}
|
|
try {
|
|
const downloaded = await downloadBlueBubblesAttachment(attachment, {
|
|
cfg: config,
|
|
accountId: account.accountId,
|
|
maxBytes,
|
|
});
|
|
const saved = await core.channel.media.saveMediaBuffer(
|
|
Buffer.from(downloaded.buffer),
|
|
downloaded.contentType,
|
|
"inbound",
|
|
maxBytes,
|
|
);
|
|
mediaPaths.push(saved.path);
|
|
mediaUrls.push(saved.path);
|
|
if (saved.contentType) {
|
|
mediaTypes.push(saved.contentType);
|
|
}
|
|
} catch (err) {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`attachment download failed guid=${attachment.guid} err=${String(err)}`,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
let replyToId = message.replyToId;
|
|
let replyToBody = message.replyToBody;
|
|
let replyToSender = message.replyToSender;
|
|
let replyToShortId: string | undefined;
|
|
|
|
if (isTapbackMessage && tapbackContext?.replyToId) {
|
|
replyToId = tapbackContext.replyToId;
|
|
}
|
|
|
|
if (replyToId) {
|
|
const cached = resolveReplyContextFromCache({
|
|
accountId: account.accountId,
|
|
replyToId,
|
|
chatGuid: message.chatGuid,
|
|
chatIdentifier: message.chatIdentifier,
|
|
chatId: message.chatId,
|
|
});
|
|
if (cached) {
|
|
if (!replyToBody && cached.body) {
|
|
replyToBody = cached.body;
|
|
}
|
|
if (!replyToSender && cached.senderLabel) {
|
|
replyToSender = cached.senderLabel;
|
|
}
|
|
replyToShortId = cached.shortId;
|
|
if (core.logging.shouldLogVerbose()) {
|
|
const preview = (cached.body ?? "").replace(/\s+/g, " ").slice(0, 120);
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`reply-context cache hit replyToId=${replyToId} sender=${replyToSender ?? ""} body="${preview}"`,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
// If no cached short ID, try to get one from the UUID directly
|
|
if (replyToId && !replyToShortId) {
|
|
replyToShortId = getShortIdForUuid(replyToId);
|
|
}
|
|
const hasReplyContext = Boolean(replyToId || replyToBody || replyToSender);
|
|
const replySenderAllowed =
|
|
!isGroup || effectiveGroupAllowFrom.length === 0
|
|
? true
|
|
: replyToSender
|
|
? isAllowedBlueBubblesSender({
|
|
allowFrom: effectiveGroupAllowFrom,
|
|
sender: replyToSender,
|
|
chatId: message.chatId ?? undefined,
|
|
chatGuid: message.chatGuid ?? undefined,
|
|
chatIdentifier: message.chatIdentifier ?? undefined,
|
|
})
|
|
: false;
|
|
const includeReplyContext =
|
|
!hasReplyContext ||
|
|
evaluateSupplementalContextVisibility({
|
|
mode: contextVisibilityMode,
|
|
kind: "quote",
|
|
senderAllowed: replySenderAllowed,
|
|
}).include;
|
|
if (hasReplyContext && !includeReplyContext && isGroup) {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`bluebubbles: drop reply context (mode=${contextVisibilityMode}, sender_allowed=${replySenderAllowed ? "yes" : "no"})`,
|
|
);
|
|
}
|
|
const visibleReplyToId = includeReplyContext ? replyToId : undefined;
|
|
const visibleReplyToShortId = includeReplyContext ? replyToShortId : undefined;
|
|
const visibleReplyToBody = includeReplyContext ? replyToBody : undefined;
|
|
const visibleReplyToSender = includeReplyContext ? replyToSender : undefined;
|
|
|
|
// Use inline [[reply_to:N]] tag format
|
|
// For tapbacks/reactions: append at end (e.g., "reacted with ❤️ [[reply_to:4]]")
|
|
// For regular replies: prepend at start (e.g., "[[reply_to:4]] Awesome")
|
|
const replyTag = formatReplyTag({
|
|
replyToId: visibleReplyToId,
|
|
replyToShortId: visibleReplyToShortId,
|
|
});
|
|
const baseBody = replyTag
|
|
? isTapbackMessage
|
|
? `${rawBody} ${replyTag}`
|
|
: `${replyTag} ${rawBody}`
|
|
: rawBody;
|
|
// Build fromLabel the same way as iMessage/Signal (formatInboundFromLabel):
|
|
// group label + id for groups, sender for DMs.
|
|
// The sender identity is included in the envelope body via formatInboundEnvelope.
|
|
const senderLabel = message.senderName || `user:${message.senderId}`;
|
|
const fromLabel = isGroup
|
|
? `${normalizeOptionalString(message.chatName) || "Group"} id:${peerId}`
|
|
: senderLabel !== message.senderId
|
|
? `${senderLabel} id:${message.senderId}`
|
|
: senderLabel;
|
|
const groupSubject = isGroup ? normalizeOptionalString(message.chatName) : undefined;
|
|
const groupMembers = isGroup
|
|
? formatGroupMembers({
|
|
participants: message.participants,
|
|
fallback: message.senderId ? { id: message.senderId, name: message.senderName } : undefined,
|
|
})
|
|
: undefined;
|
|
const storePath = core.channel.session.resolveStorePath(config.session?.store, {
|
|
agentId: route.agentId,
|
|
});
|
|
const envelopeOptions = core.channel.reply.resolveEnvelopeFormatOptions(config);
|
|
const previousTimestamp = core.channel.session.readSessionUpdatedAt({
|
|
storePath,
|
|
sessionKey: route.sessionKey,
|
|
});
|
|
const body = core.channel.reply.formatInboundEnvelope({
|
|
channel: "BlueBubbles",
|
|
from: fromLabel,
|
|
timestamp: message.timestamp,
|
|
previousTimestamp,
|
|
envelope: envelopeOptions,
|
|
body: baseBody,
|
|
chatType: isGroup ? "group" : "direct",
|
|
sender: { name: message.senderName || undefined, id: message.senderId },
|
|
});
|
|
let chatGuidForActions = chatGuid;
|
|
if (!chatGuidForActions && baseUrl && password) {
|
|
const resolveTarget =
|
|
isGroup && (chatId || chatIdentifier)
|
|
? chatId
|
|
? ({ kind: "chat_id", chatId } as const)
|
|
: ({ kind: "chat_identifier", chatIdentifier: chatIdentifier ?? "" } as const)
|
|
: ({ kind: "handle", address: message.senderId } as const);
|
|
if (resolveTarget.kind !== "chat_identifier" || resolveTarget.chatIdentifier) {
|
|
chatGuidForActions =
|
|
(await resolveChatGuidForTarget({
|
|
baseUrl,
|
|
password,
|
|
target: resolveTarget,
|
|
allowPrivateNetwork: isPrivateNetworkOptInEnabled(account.config),
|
|
})) ?? undefined;
|
|
}
|
|
}
|
|
|
|
const ackReactionScope = config.messages?.ackReactionScope ?? "group-mentions";
|
|
const removeAckAfterReply = config.messages?.removeAckAfterReply ?? false;
|
|
const ackReactionValue = resolveBlueBubblesAckReaction({
|
|
cfg: config,
|
|
agentId: route.agentId,
|
|
core,
|
|
runtime,
|
|
});
|
|
const shouldAckReaction = () =>
|
|
Boolean(
|
|
ackReactionValue &&
|
|
core.channel.reactions.shouldAckReaction({
|
|
scope: ackReactionScope,
|
|
isDirect: !isGroup,
|
|
isGroup,
|
|
isMentionableGroup: isGroup,
|
|
requireMention,
|
|
canDetectMention,
|
|
effectiveWasMentioned,
|
|
shouldBypassMention,
|
|
}),
|
|
);
|
|
const ackMessageId = message.messageId?.trim() || "";
|
|
const ackReactionPromise =
|
|
shouldAckReaction() && ackMessageId && chatGuidForActions && ackReactionValue
|
|
? sendBlueBubblesReaction({
|
|
chatGuid: chatGuidForActions,
|
|
messageGuid: ackMessageId,
|
|
emoji: ackReactionValue,
|
|
opts: { cfg: config, accountId: account.accountId },
|
|
}).then(
|
|
() => true,
|
|
(err) => {
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`ack reaction failed chatGuid=${chatGuidForActions} msg=${ackMessageId}: ${String(err)}`,
|
|
);
|
|
return false;
|
|
},
|
|
)
|
|
: null;
|
|
|
|
// Respect sendReadReceipts config (parity with WhatsApp)
|
|
const sendReadReceipts = account.config.sendReadReceipts !== false;
|
|
if (chatGuidForActions && baseUrl && password && sendReadReceipts) {
|
|
try {
|
|
await markBlueBubblesChatRead(chatGuidForActions, {
|
|
cfg: config,
|
|
accountId: account.accountId,
|
|
});
|
|
logVerbose(core, runtime, `marked read chatGuid=${chatGuidForActions}`);
|
|
} catch (err) {
|
|
runtime.error?.(`[bluebubbles] mark read failed: ${String(err)}`);
|
|
}
|
|
} else if (!sendReadReceipts) {
|
|
logVerbose(core, runtime, "mark read skipped (sendReadReceipts=false)");
|
|
} else {
|
|
logVerbose(core, runtime, "mark read skipped (missing chatGuid or credentials)");
|
|
}
|
|
|
|
const outboundTarget = isGroup
|
|
? formatBlueBubblesChatTarget({
|
|
chatId,
|
|
chatGuid: chatGuidForActions ?? chatGuid,
|
|
chatIdentifier,
|
|
}) || peerId
|
|
: chatGuidForActions
|
|
? formatBlueBubblesChatTarget({ chatGuid: chatGuidForActions })
|
|
: message.senderId;
|
|
|
|
const maybeEnqueueOutboundMessageId = (messageId?: string, snippet?: string): boolean => {
|
|
const trimmed = messageId?.trim();
|
|
if (!trimmed || trimmed === "ok" || trimmed === "unknown") {
|
|
return false;
|
|
}
|
|
// Cache outbound message to get short ID
|
|
const cacheEntry = rememberBlueBubblesReplyCache({
|
|
accountId: account.accountId,
|
|
messageId: trimmed,
|
|
chatGuid: chatGuidForActions ?? chatGuid,
|
|
chatIdentifier,
|
|
chatId,
|
|
senderLabel: "me",
|
|
body: snippet ?? "",
|
|
timestamp: Date.now(),
|
|
});
|
|
const displayId = cacheEntry.shortId || trimmed;
|
|
const preview = snippet ? ` "${snippet.slice(0, 12)}${snippet.length > 12 ? "…" : ""}"` : "";
|
|
core.system.enqueueSystemEvent(`Assistant sent${preview} [message_id:${displayId}]`, {
|
|
sessionKey: route.sessionKey,
|
|
contextKey: `bluebubbles:outbound:${outboundTarget}:${trimmed}`,
|
|
});
|
|
return true;
|
|
};
|
|
const sanitizeReplyDirectiveText = (value: string): string => {
|
|
if (privateApiEnabled) {
|
|
return value;
|
|
}
|
|
return value
|
|
.replace(REPLY_DIRECTIVE_TAG_RE, " ")
|
|
.replace(/[ \t]+/g, " ")
|
|
.trim();
|
|
};
|
|
|
|
// History: in-memory rolling map with bounded API backfill retries
|
|
const historyLimit = isGroup
|
|
? (account.config.historyLimit ?? 0)
|
|
: (account.config.dmHistoryLimit ?? 0);
|
|
|
|
const historyIdentifier =
|
|
chatGuid ||
|
|
chatIdentifier ||
|
|
(chatId ? String(chatId) : null) ||
|
|
(isGroup ? null : message.senderId) ||
|
|
"";
|
|
const historyKey = historyIdentifier
|
|
? buildAccountScopedHistoryKey(account.accountId, historyIdentifier)
|
|
: "";
|
|
|
|
// Record the current message into rolling history
|
|
if (historyKey && historyLimit > 0) {
|
|
const nowMs = Date.now();
|
|
const senderLabel = message.fromMe ? "me" : message.senderName || message.senderId;
|
|
const normalizedHistoryBody = truncateHistoryBody(text, MAX_STORED_HISTORY_ENTRY_CHARS);
|
|
const currentEntries = recordPendingHistoryEntryIfEnabled({
|
|
historyMap: chatHistories,
|
|
limit: historyLimit,
|
|
historyKey,
|
|
entry: normalizedHistoryBody
|
|
? {
|
|
sender: senderLabel,
|
|
body: normalizedHistoryBody,
|
|
timestamp: message.timestamp ?? nowMs,
|
|
messageId: message.messageId ?? undefined,
|
|
}
|
|
: null,
|
|
});
|
|
pruneHistoryBackfillState();
|
|
|
|
const backfillAttempt = planHistoryBackfillAttempt(historyKey, nowMs);
|
|
if (backfillAttempt) {
|
|
try {
|
|
const backfillResult = await fetchBlueBubblesHistory(historyIdentifier, historyLimit, {
|
|
cfg: config,
|
|
accountId: account.accountId,
|
|
});
|
|
if (backfillResult.resolved) {
|
|
markHistoryBackfillResolved(historyKey);
|
|
}
|
|
if (backfillResult.entries.length > 0) {
|
|
const apiEntries: HistoryEntry[] = [];
|
|
for (const entry of backfillResult.entries) {
|
|
const body = truncateHistoryBody(entry.body, MAX_STORED_HISTORY_ENTRY_CHARS);
|
|
if (!body) {
|
|
continue;
|
|
}
|
|
apiEntries.push({
|
|
sender: entry.sender,
|
|
body,
|
|
timestamp: entry.timestamp,
|
|
messageId: entry.messageId,
|
|
});
|
|
}
|
|
const merged = mergeHistoryEntries({
|
|
apiEntries,
|
|
currentEntries:
|
|
currentEntries.length > 0 ? currentEntries : (chatHistories.get(historyKey) ?? []),
|
|
limit: historyLimit,
|
|
});
|
|
if (chatHistories.has(historyKey)) {
|
|
chatHistories.delete(historyKey);
|
|
}
|
|
chatHistories.set(historyKey, merged);
|
|
evictOldHistoryKeys(chatHistories);
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`backfilled ${backfillResult.entries.length} history messages for ${isGroup ? "group" : "DM"}: ${historyIdentifier}`,
|
|
);
|
|
} else if (!backfillResult.resolved) {
|
|
const remainingAttempts = HISTORY_BACKFILL_MAX_ATTEMPTS - backfillAttempt.attempts;
|
|
const nextBackoffMs = Math.max(backfillAttempt.nextAttemptAt - nowMs, 0);
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`history backfill unresolved for ${historyIdentifier}; retries left=${Math.max(remainingAttempts, 0)} next_in_ms=${nextBackoffMs}`,
|
|
);
|
|
}
|
|
} catch (err) {
|
|
const remainingAttempts = HISTORY_BACKFILL_MAX_ATTEMPTS - backfillAttempt.attempts;
|
|
const nextBackoffMs = Math.max(backfillAttempt.nextAttemptAt - nowMs, 0);
|
|
logVerbose(
|
|
core,
|
|
runtime,
|
|
`history backfill failed for ${historyIdentifier}: ${String(err)} (retries left=${Math.max(remainingAttempts, 0)} next_in_ms=${nextBackoffMs})`,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Build inbound history from the in-memory map
|
|
let inboundHistory: Array<{ sender: string; body: string; timestamp?: number }> | undefined;
|
|
if (historyKey && historyLimit > 0) {
|
|
const entries = chatHistories.get(historyKey);
|
|
if (entries && entries.length > 0) {
|
|
inboundHistory = buildInboundHistorySnapshot({
|
|
entries,
|
|
limit: historyLimit,
|
|
});
|
|
}
|
|
}
|
|
const commandBody = messageText.trim();
|
|
|
|
const ctxPayload = core.channel.reply.finalizeInboundContext({
|
|
Body: body,
|
|
BodyForAgent: rawBody,
|
|
InboundHistory: inboundHistory,
|
|
RawBody: rawBody,
|
|
CommandBody: commandBody,
|
|
BodyForCommands: commandBody,
|
|
MediaUrl: mediaUrls[0],
|
|
MediaUrls: mediaUrls.length > 0 ? mediaUrls : undefined,
|
|
MediaPath: mediaPaths[0],
|
|
MediaPaths: mediaPaths.length > 0 ? mediaPaths : undefined,
|
|
MediaType: mediaTypes[0],
|
|
MediaTypes: mediaTypes.length > 0 ? mediaTypes : undefined,
|
|
From: isGroup ? `group:${peerId}` : `bluebubbles:${message.senderId}`,
|
|
To: `bluebubbles:${outboundTarget}`,
|
|
SessionKey: route.sessionKey,
|
|
AccountId: route.accountId,
|
|
ChatType: isGroup ? "group" : "direct",
|
|
ConversationLabel: fromLabel,
|
|
// Use short ID for token savings (agent can use this to reference the message)
|
|
ReplyToId: visibleReplyToShortId || visibleReplyToId,
|
|
ReplyToIdFull: visibleReplyToId,
|
|
ReplyToBody: visibleReplyToBody,
|
|
ReplyToSender: visibleReplyToSender,
|
|
GroupSubject: groupSubject,
|
|
GroupMembers: groupMembers,
|
|
SenderName: message.senderName || undefined,
|
|
SenderId: message.senderId,
|
|
Provider: "bluebubbles",
|
|
Surface: "bluebubbles",
|
|
// Use short ID for token savings (agent can use this to reference the message)
|
|
MessageSid: messageShortId || message.messageId,
|
|
MessageSidFull: message.messageId,
|
|
Timestamp: message.timestamp,
|
|
OriginatingChannel: "bluebubbles",
|
|
OriginatingTo: `bluebubbles:${outboundTarget}`,
|
|
WasMentioned: effectiveWasMentioned,
|
|
CommandAuthorized: commandAuthorized,
|
|
});
|
|
|
|
let sentMessage = false;
|
|
let streamingActive = false;
|
|
let typingRestartTimer: NodeJS.Timeout | undefined;
|
|
const typingRestartDelayMs = 150;
|
|
const clearTypingRestartTimer = () => {
|
|
if (typingRestartTimer) {
|
|
clearTimeout(typingRestartTimer);
|
|
typingRestartTimer = undefined;
|
|
}
|
|
};
|
|
const restartTypingSoon = () => {
|
|
if (!streamingActive || !chatGuidForActions || !baseUrl || !password) {
|
|
return;
|
|
}
|
|
clearTypingRestartTimer();
|
|
typingRestartTimer = setTimeout(() => {
|
|
typingRestartTimer = undefined;
|
|
if (!streamingActive) {
|
|
return;
|
|
}
|
|
sendBlueBubblesTyping(chatGuidForActions, true, {
|
|
cfg: config,
|
|
accountId: account.accountId,
|
|
}).catch((err) => {
|
|
runtime.error?.(`[bluebubbles] typing restart failed: ${String(err)}`);
|
|
});
|
|
}, typingRestartDelayMs);
|
|
};
|
|
try {
|
|
const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({
|
|
cfg: config,
|
|
agentId: route.agentId,
|
|
channel: "bluebubbles",
|
|
accountId: account.accountId,
|
|
typingCallbacks: {
|
|
onReplyStart: async () => {
|
|
if (!chatGuidForActions) {
|
|
return;
|
|
}
|
|
if (!baseUrl || !password) {
|
|
return;
|
|
}
|
|
streamingActive = true;
|
|
clearTypingRestartTimer();
|
|
try {
|
|
await sendBlueBubblesTyping(chatGuidForActions, true, {
|
|
cfg: config,
|
|
accountId: account.accountId,
|
|
});
|
|
} catch (err) {
|
|
runtime.error?.(`[bluebubbles] typing start failed: ${String(err)}`);
|
|
}
|
|
},
|
|
onIdle: () => {
|
|
if (!chatGuidForActions) {
|
|
return;
|
|
}
|
|
if (!baseUrl || !password) {
|
|
return;
|
|
}
|
|
// Intentionally no-op for block streaming. We stop typing in finally
|
|
// after the run completes to avoid flicker between paragraph blocks.
|
|
},
|
|
},
|
|
});
|
|
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
|
|
ctx: ctxPayload,
|
|
cfg: config,
|
|
dispatcherOptions: {
|
|
...replyPipeline,
|
|
deliver: async (payload, info) => {
|
|
const rawReplyToId =
|
|
privateApiEnabled && typeof payload.replyToId === "string"
|
|
? payload.replyToId.trim()
|
|
: "";
|
|
// Resolve short ID (e.g., "5") to full UUID
|
|
const replyToMessageGuid = rawReplyToId
|
|
? resolveBlueBubblesMessageId(rawReplyToId, { requireKnownShortId: true })
|
|
: "";
|
|
const mediaList = resolveOutboundMediaUrls(payload);
|
|
if (mediaList.length > 0) {
|
|
const tableMode = core.channel.text.resolveMarkdownTableMode({
|
|
cfg: config,
|
|
channel: "bluebubbles",
|
|
accountId: account.accountId,
|
|
});
|
|
const text = sanitizeReplyDirectiveText(
|
|
core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode),
|
|
);
|
|
await sendMediaWithLeadingCaption({
|
|
mediaUrls: mediaList,
|
|
caption: text,
|
|
send: async ({ mediaUrl, caption }) => {
|
|
const cachedBody = (caption ?? "").trim() || "<media:attachment>";
|
|
const pendingId = rememberPendingOutboundMessageId({
|
|
accountId: account.accountId,
|
|
sessionKey: route.sessionKey,
|
|
outboundTarget,
|
|
chatGuid: chatGuidForActions ?? chatGuid,
|
|
chatIdentifier,
|
|
chatId,
|
|
snippet: cachedBody,
|
|
});
|
|
let result: Awaited<ReturnType<typeof sendBlueBubblesMedia>>;
|
|
try {
|
|
result = await sendBlueBubblesMedia({
|
|
cfg: config,
|
|
to: outboundTarget,
|
|
mediaUrl,
|
|
caption: caption ?? undefined,
|
|
replyToId: replyToMessageGuid || null,
|
|
accountId: account.accountId,
|
|
});
|
|
} catch (err) {
|
|
forgetPendingOutboundMessageId(pendingId);
|
|
throw err;
|
|
}
|
|
if (maybeEnqueueOutboundMessageId(result.messageId, cachedBody)) {
|
|
forgetPendingOutboundMessageId(pendingId);
|
|
}
|
|
sentMessage = true;
|
|
statusSink?.({ lastOutboundAt: Date.now() });
|
|
if (info.kind === "block") {
|
|
restartTypingSoon();
|
|
}
|
|
},
|
|
});
|
|
return;
|
|
}
|
|
|
|
const textLimit =
|
|
account.config.textChunkLimit && account.config.textChunkLimit > 0
|
|
? account.config.textChunkLimit
|
|
: DEFAULT_TEXT_LIMIT;
|
|
const chunkMode = account.config.chunkMode ?? "length";
|
|
const tableMode = core.channel.text.resolveMarkdownTableMode({
|
|
cfg: config,
|
|
channel: "bluebubbles",
|
|
accountId: account.accountId,
|
|
});
|
|
const text = sanitizeReplyDirectiveText(
|
|
core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode),
|
|
);
|
|
const chunks =
|
|
chunkMode === "newline"
|
|
? resolveTextChunksWithFallback(
|
|
text,
|
|
core.channel.text.chunkTextWithMode(text, textLimit, chunkMode),
|
|
)
|
|
: resolveTextChunksWithFallback(
|
|
text,
|
|
core.channel.text.chunkMarkdownText(text, textLimit),
|
|
);
|
|
if (!chunks.length) {
|
|
return;
|
|
}
|
|
for (const chunk of chunks) {
|
|
const pendingId = rememberPendingOutboundMessageId({
|
|
accountId: account.accountId,
|
|
sessionKey: route.sessionKey,
|
|
outboundTarget,
|
|
chatGuid: chatGuidForActions ?? chatGuid,
|
|
chatIdentifier,
|
|
chatId,
|
|
snippet: chunk,
|
|
});
|
|
let result: Awaited<ReturnType<typeof sendMessageBlueBubbles>>;
|
|
try {
|
|
result = await sendMessageBlueBubbles(outboundTarget, chunk, {
|
|
cfg: config,
|
|
accountId: account.accountId,
|
|
replyToMessageGuid: replyToMessageGuid || undefined,
|
|
});
|
|
} catch (err) {
|
|
forgetPendingOutboundMessageId(pendingId);
|
|
throw err;
|
|
}
|
|
if (maybeEnqueueOutboundMessageId(result.messageId, chunk)) {
|
|
forgetPendingOutboundMessageId(pendingId);
|
|
}
|
|
sentMessage = true;
|
|
statusSink?.({ lastOutboundAt: Date.now() });
|
|
if (info.kind === "block") {
|
|
restartTypingSoon();
|
|
}
|
|
}
|
|
},
|
|
onReplyStart: typingCallbacks?.onReplyStart,
|
|
onIdle: typingCallbacks?.onIdle,
|
|
onError: (err, info) => {
|
|
// Flag the outer dedupe wrapper so it releases the claim instead
|
|
// of committing. Without this, a transient BlueBubbles send failure
|
|
// would permanently block replay-retry for 7 days and the user
|
|
// would never receive a reply to that message.
|
|
//
|
|
// Only the terminal `final` delivery represents the user-visible
|
|
// answer. The dispatcher continues past `tool` / `block` failures
|
|
// and may still deliver `final` successfully — releasing the
|
|
// dedupe claim for those would invite a replay that re-runs tool
|
|
// side effects and resends partially-delivered content.
|
|
if (info.kind === "final") {
|
|
dedupeSignal.deliveryFailed = true;
|
|
}
|
|
runtime.error?.(`BlueBubbles ${info.kind} reply failed: ${String(err)}`);
|
|
},
|
|
},
|
|
replyOptions: {
|
|
onModelSelected,
|
|
disableBlockStreaming:
|
|
typeof account.config.blockStreaming === "boolean"
|
|
? !account.config.blockStreaming
|
|
: undefined,
|
|
},
|
|
});
|
|
} finally {
|
|
const shouldStopTyping =
|
|
Boolean(chatGuidForActions && baseUrl && password) && (streamingActive || !sentMessage);
|
|
streamingActive = false;
|
|
clearTypingRestartTimer();
|
|
if (sentMessage && chatGuidForActions && ackMessageId) {
|
|
core.channel.reactions.removeAckReactionAfterReply({
|
|
removeAfterReply: removeAckAfterReply,
|
|
ackReactionPromise,
|
|
ackReactionValue: ackReactionValue ?? null,
|
|
remove: () =>
|
|
sendBlueBubblesReaction({
|
|
chatGuid: chatGuidForActions,
|
|
messageGuid: ackMessageId,
|
|
emoji: ackReactionValue ?? "",
|
|
remove: true,
|
|
opts: { cfg: config, accountId: account.accountId },
|
|
}),
|
|
onError: (err) => {
|
|
logAckFailure({
|
|
log: (msg) => logVerbose(core, runtime, msg),
|
|
channel: "bluebubbles",
|
|
target: `${chatGuidForActions}/${ackMessageId}`,
|
|
error: err,
|
|
});
|
|
},
|
|
});
|
|
}
|
|
if (shouldStopTyping && chatGuidForActions) {
|
|
// Stop typing after streaming completes to avoid a stuck indicator.
|
|
sendBlueBubblesTyping(chatGuidForActions, false, {
|
|
cfg: config,
|
|
accountId: account.accountId,
|
|
}).catch((err) => {
|
|
logTypingFailure({
|
|
log: (msg) => logVerbose(core, runtime, msg),
|
|
channel: "bluebubbles",
|
|
action: "stop",
|
|
target: chatGuidForActions,
|
|
error: err,
|
|
});
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
export async function processReaction(
|
|
reaction: NormalizedWebhookReaction,
|
|
target: WebhookTarget,
|
|
): Promise<void> {
|
|
const { account, config, runtime, core } = target;
|
|
const pairing = createChannelPairingController({
|
|
core,
|
|
channel: "bluebubbles",
|
|
accountId: account.accountId,
|
|
});
|
|
if (reaction.fromMe) {
|
|
return;
|
|
}
|
|
|
|
const dmPolicy = account.config.dmPolicy ?? "pairing";
|
|
const groupPolicy = account.config.groupPolicy ?? "allowlist";
|
|
const storeAllowFrom = await readStoreAllowFromForDmPolicy({
|
|
provider: "bluebubbles",
|
|
accountId: account.accountId,
|
|
dmPolicy,
|
|
readStore: pairing.readStoreForDmPolicy,
|
|
});
|
|
const accessDecision = resolveDmGroupAccessWithLists({
|
|
isGroup: reaction.isGroup,
|
|
dmPolicy,
|
|
groupPolicy,
|
|
allowFrom: account.config.allowFrom,
|
|
groupAllowFrom: account.config.groupAllowFrom,
|
|
storeAllowFrom,
|
|
isSenderAllowed: (allowFrom) =>
|
|
isAllowedBlueBubblesSender({
|
|
allowFrom,
|
|
sender: reaction.senderId,
|
|
chatId: reaction.chatId ?? undefined,
|
|
chatGuid: reaction.chatGuid ?? undefined,
|
|
chatIdentifier: reaction.chatIdentifier ?? undefined,
|
|
}),
|
|
});
|
|
if (accessDecision.decision !== "allow") {
|
|
return;
|
|
}
|
|
|
|
const chatId = reaction.chatId ?? undefined;
|
|
const chatGuid = reaction.chatGuid ?? undefined;
|
|
const chatIdentifier = reaction.chatIdentifier ?? undefined;
|
|
const peerId = reaction.isGroup
|
|
? (chatGuid ?? chatIdentifier ?? (chatId ? String(chatId) : "group"))
|
|
: reaction.senderId;
|
|
const requireMention =
|
|
reaction.isGroup &&
|
|
core.channel.groups.resolveRequireMention({
|
|
cfg: config,
|
|
channel: "bluebubbles",
|
|
groupId: peerId,
|
|
accountId: account.accountId,
|
|
});
|
|
|
|
if (requireMention) {
|
|
logVerbose(core, runtime, "bluebubbles: skipping group reaction (requireMention=true)");
|
|
return;
|
|
}
|
|
|
|
const route = resolveBlueBubblesConversationRoute({
|
|
cfg: config,
|
|
accountId: account.accountId,
|
|
isGroup: reaction.isGroup,
|
|
peerId,
|
|
sender: reaction.senderId,
|
|
chatId,
|
|
chatGuid,
|
|
chatIdentifier,
|
|
});
|
|
|
|
const senderLabel = reaction.senderName || reaction.senderId;
|
|
const chatLabel = reaction.isGroup ? ` in group:${peerId}` : "";
|
|
// Use short ID for token savings
|
|
const messageDisplayId = getShortIdForUuid(reaction.messageId) || reaction.messageId;
|
|
// Format: "Tyler reacted with ❤️ [[reply_to:5]]" or "Tyler removed ❤️ reaction [[reply_to:5]]"
|
|
const text =
|
|
reaction.action === "removed"
|
|
? `${senderLabel} removed ${reaction.emoji} reaction [[reply_to:${messageDisplayId}]]${chatLabel}`
|
|
: `${senderLabel} reacted with ${reaction.emoji} [[reply_to:${messageDisplayId}]]${chatLabel}`;
|
|
core.system.enqueueSystemEvent(text, {
|
|
sessionKey: route.sessionKey,
|
|
contextKey: `bluebubbles:reaction:${reaction.action}:${peerId}:${reaction.messageId}:${reaction.senderId}:${reaction.emoji}`,
|
|
});
|
|
logVerbose(core, runtime, `reaction event enqueued: ${text}`);
|
|
}
|