Step 4: Function Nodes and Logic
In this step, we will explore the function nodes and logic used in the MCP Server implementation using Node-RED.
Operation Handling
The MCP Server handles different types of operations, such as initialization, notifications, and tool calls. These operations are processed using a switch node that routes messages based on the method property:
// Switch node configuration
msg.payload.method === "initialize" ? output 1
msg.payload.method === "notifications/initialized" ? output 2
msg.payload.method === "tools/list" ? output 3
msg.payload.method === "tools/call" ? output 4
Initialization
When a client connects, the initialization process is handled by a function node:
// Initialization function
let sessions = global.get("sse_sessions") || {};
let session = sessions[msg.req.query.sessionId];
if (session) {
session.messages = session.messages || [];
msg.info = {
"result": {
"protocolVersion": "2024-11-05",
"capabilities": { "tools": {} },
"serverInfo": {
"name": "Demo",
"version": "1.0.0"
}
},
"jsonrpc": "2.0",
"id": session.messages.length
};
session.messages.push({
data: msg.info, // message to send
sent: false // initially unsent
});
global.set("sse_sessions", sessions);
}
msg.payload = "Accepted";
msg.res._res.setHeader("Content-Type", "text/plain");
Tool Listing
The server provides a list of available tools to the client through another function node:
// Tool listing function
let sessions = global.get("sse_sessions") || {};
let session = sessions[msg.req.query.sessionId];
if (session) {
session.messages = session.messages || [];
msg.tools = {
"result": {
"tools": [{
"name": "add",
"inputSchema": {
"type": "object",
"properties": {
"a": { "type": "number" },
"b": { "type": "number" }
},
"required": ["a", "b"],
"additionalProperties": false,
"$schema": "http://json-schema.org/draft-07/schema#"
}
}]
},
"jsonrpc": "2.0",
"id": session.messages.length
};
session.messages.push({
data: msg.tools, // message to send
sent: false // initially unsent
});
global.set("sse_sessions", sessions);
}
msg.payload = "Accepted";
msg.res._res.setHeader("Content-Type", "text/plain");
Tool Execution
When a client requests to execute a tool, the server processes the request with a function node:
// Tool execution function
if (msg.payload.params.name == "add") {
msg.result = msg.payload.params.arguments.a + msg.payload.params.arguments.b;
let sessions = global.get("sse_sessions") || {};
let session = sessions[msg.req.query.sessionId];
if (session) {
session.messages = session.messages || [];
let message = {
"result": {
"content": [{
"type": "text",
"text": msg.result.toString()
}]
},
"jsonrpc": "2.0",
"id": session.messages.length
};
session.messages.push({
data: message, // message to send
sent: false // initially unsent
});
global.set("sse_sessions", sessions);
}
}
msg.res._res.setHeader("Content-Type", "text/plain");
Complete JSON Export
Below is the complete JSON export for the MCP Server, which you can import into Node-RED to replicate the entire setup:
[
{
"id": "c882531a6a2da6a2",
"type": "http in",
"z": "3321740562069c7c",
"name": "",
"url": "/sse",
"method": "get",
"upload": false,
"swaggerDoc": "",
"x": 100,
"y": 60,
"wires": [
[
"2a76a8cf4603effc",
"2f4cbc634ee2512c"
]
]
},
{
"id": "2a76a8cf4603effc",
"type": "debug",
"z": "3321740562069c7c",
"name": "debug 986",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"x": 490,
"y": 60,
"wires": []
},
{
"id": "5da7c291851c53d5",
"type": "http in",
"z": "3321740562069c7c",
"name": "",
"url": "/sse/message",
"method": "post",
"upload": false,
"swaggerDoc": "",
"x": 130,
"y": 200,
"wires": [
[
"33dc78a7f039bade"
]
]
},
{
"id": "2f4cbc634ee2512c",
"type": "function",
"z": "3321740562069c7c",
"name": "function 877",
"func": "// Get or init connection map\nlet seenIPs = global.get(\"seenIPs\") || {};\n\nfunction generateUUID() {\n return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {\n const r = Math.random() * 16 | 0,\n v = c === 'x' ? r : (r & 0x3 | 0x8);\n return v.toString(16);\n });\n}\n\nconst sessionId = generateUUID();\nconst endpoint = `/sse/message?sessionId=${sessionId}`;\n\n// Store session (optional)\nlet sessions = global.get(\"sse_sessions\") || {};\nsessions[sessionId] = {\n createdAt: new Date().toISOString(),\n sseRes: msg.res._res\n};\nglobal.set(\"sse_sessions\", sessions);\n\n// Set headers\nmsg.res._res.setHeader(\"Content-Type\", \"text/event-stream\");\nmsg.res._res.setHeader(\"Cache-Control\", \"no-cache\");\nmsg.res._res.setHeader(\"Connection\", \"keep-alive\");\nmsg.res._res.setHeader(\"access-control-max-age\", \"86400\");\nmsg.res._res.setHeader(\"Access-Control-Allow-Origin\", \"*\");\nmsg.res._res.setHeader(\"Access-Control-Allow-Methods\", \"GET, POST, OPTIONS\");\nmsg.res._res.setHeader(\"Access-Control-Allow-Headers\", \"Authorization, *\");\n\n// Send initial SSE message with endpoint info\nmsg.res._res.write(`event: endpoint\\ndata: ${msg.endpoint}\\n\\n`);\n\nconst messagePoller = setInterval(() => {\n const currentSessions = global.get(\"sse_sessions\") || {};\n const session = currentSessions[sessionId];\n\n if (!session) {\n clearInterval(messagePoller);\n return;\n }\n\n session.messages = session.messages || [];\n\n for (let i = 0; i < session.messages.length; i++) {\n const msgObj = session.messages[i];\n if (!msgObj.sent) {\n session.sseRes.write(`event: message\\ndata: ${JSON.stringify(msgObj.data)}\\n\\n`);\n msgObj.sent = true;\n }\n }\n\n // Save session state\n currentSessions[sessionId] = session;\n global.set(\"sse_sessions\", currentSessions);\n}, 1000);\n\n\nreturn null;",
"outputs": 1,
"timeout": 0,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 290,
"y": 60,
"wires": [
[
"2a76a8cf4603effc"
]
]
},
{
"id": "33dc78a7f039bade",
"type": "function",
"z": "3321740562069c7c",
"name": "Check Session ID",
"func": "let sessions = global.get(\"sse_sessions\") || {};\nconst sessionId = msg.req.query.sessionId;\n\n\nif (!sessions[sessionId]) {\n node.warn(`No active SSE stream for session: ${sessionId}`);\n return null;\n}\nreturn msg;",
"outputs": 1,
"timeout": 0,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 330,
"y": 200,
"wires": [
[
"a31253cbf071550e",
"be61764cea585437"
]
]
},
{
"id": "a31253cbf071550e",
"type": "switch",
"z": "3321740562069c7c",
"name": "Method",
"property": "payload.method",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "initialize",
"vt": "str"
},
{
"t": "eq",
"v": "notifications/initialized",
"vt": "str"
},
{
"t": "eq",
"v": "tools/list",
"vt": "str"
},
{
"t": "eq",
"v": "tools/call",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 4,
"x": 520,
"y": 200,
"wires": [
[
"ba95d5d67afd4e2c"
],
[
"e0241c575cc64d24"
],
[
"eb17cecf247e47ce"
],
[
"b3bfb7383e55497d"
]
]
},
{
"id": "ba95d5d67afd4e2c",
"type": "function",
"z": "3321740562069c7c",
"name": "initialize",
"func": "let sessions = global.get(\"sse_sessions\") || {};\nlet session = sessions[msg.req.query.sessionId];\n\n\nif (session) {\n session.messages = session.messages || [];\n msg.info = { \"result\": { \"protocolVersion\": \"2024-11-05\", \"capabilities\": { \"tools\": {} }, \"serverInfo\": { \"name\": \"Demo\", \"version\": \"1.0.0\" } }, \"jsonrpc\": \"2.0\", \"id\": session.messages.length }\n session.messages.push({\n data: msg.info, // message to send\n sent: false // initially unsent\n });\n\n global.set(\"sse_sessions\", sessions);\n}\n\nmsg.payload = \"Accepted\"\nmsg.res._res.setHeader(\"Content-Type\", \"text/plain\");\n\n\n\n\nreturn msg;",
"outputs": 1,
"timeout": 0,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 680,
"y": 140,
"wires": [
[
"516a3bd424ce196d",
"edd192a7719aaaa4"
]
]
},
{
"id": "eb17cecf247e47ce",
"type": "function",
"z": "3321740562069c7c",
"name": "tools/list",
"func": "let sessions = global.get(\"sse_sessions\") || {};\nlet session = sessions[msg.req.query.sessionId];\n\nif (session) {\n session.messages = session.messages || [];\n msg.tools = { \"result\": { \"tools\": [{ \"name\": \"add\", \"inputSchema\": { \"type\": \"object\", \"properties\": { \"a\": { \"type\": \"number\" }, \"b\": { \"type\": \"number\" } }, \"required\": [\"a\", \"b\"], \"additionalProperties\": false, \"$schema\": \"http://json-schema.org/draft-07/schema#\" } }] }, \"jsonrpc\": \"2.0\", \"id\": session.messages.length }\n\n session.messages.push({\n data: msg.tools, // message to send\n sent: false // initially unsent\n });\n\n global.set(\"sse_sessions\", sessions);\n}\n\nmsg.payload = \"Accepted\"\nmsg.res._res.setHeader(\"Content-Type\", \"text/plain\");\n\nreturn msg;",
"outputs": 1,
"timeout": 0,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 680,
"y": 220,
"wires": [
[
"e4e373185b6792ed",
"edd192a7719aaaa4"
]
]
},
{
"id": "b3bfb7383e55497d",
"type": "function",
"z": "3321740562069c7c",
"name": "tools/call",
"func": "if (msg.payload.params.name == \"add\") {\n msg.result = msg.payload.params.arguments.a + msg.payload.params.arguments.b\n\n let sessions = global.get(\"sse_sessions\") || {};\n let session = sessions[msg.req.query.sessionId];\n\n if (session) {\n session.messages = session.messages || [];\n let message = { \"result\": { \"content\": [{ \"type\": \"text\", \"text\": msg.result.toString() }] }, \"jsonrpc\": \"2.0\", \"id\": session.messages.length };\n session.messages.push({\n data: message, // message to send\n sent: false // initially unsent\n });\n\n global.set(\"sse_sessions\", sessions);\n }\n}\n\nmsg.res._res.setHeader(\"Content-Type\", \"text/plain\");\n\nreturn msg;",
"outputs": 1,
"timeout": 0,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 680,
"y": 280,
"wires": [
[
"aa1794605e3d49e3"
]
]
}
]
Conclusion
With these four steps, you've learned how to build your own MCP Server using Node-RED. This implementation provides a solid foundation for creating custom MCP Servers that can interact with AI models through the Model Context Protocol.
To get started with your own implementation:
- Import the JSON configurations provided at the end of each step.
- Customize the server according to your specific requirements.
- Test your server with an MCP-compatible client.
Once your server is running, it can be integrated with various AI models and applications, enabling a wide range of interactive capabilities.