- Published on
gRPC in Node.js — Protocol Buffers, Streaming, and Service-to-Service Communication
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
gRPC is a high-performance RPC framework using Protocol Buffers for serialization and HTTP/2 for transport. Unlike REST APIs, gRPC offers streaming, multiplexing, and binary efficiency. Protocol Buffers provide schema versioning and language-agnostic contracts. This post covers defining protobuf schemas, implementing servers, unary vs streaming patterns, interceptors for cross-cutting concerns, grpc-gateway for REST compatibility, health checks, and reflection APIs for debugging.
- Protocol Buffer Schema Design
- Node.js gRPC Server with @grpc/grpc-js
- Unary vs Server-Streaming vs Bidirectional
- Interceptors for Auth and Logging
- gRPC-Gateway for REST Compatibility
- Health Checking and Reflection
- gRPC Production Checklist
- Conclusion
Protocol Buffer Schema Design
Define message types and services in .proto files. Protocol Buffers are versioned and backward/forward compatible.
// blog/v1/post.proto
syntax = "proto3";
package blog.v1;
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
// Service definition
service PostService {
rpc CreatePost(CreatePostRequest) returns (Post);
rpc GetPost(GetPostRequest) returns (Post);
rpc ListPosts(ListPostsRequest) returns (stream Post);
rpc SubscribePostUpdates(SubscribeRequest) returns (stream PostUpdate);
rpc PublishPost(PublishPostRequest) returns (google.protobuf.Empty);
rpc DeletePost(DeletePostRequest) returns (google.protobuf.Empty);
}
// Messages
message Post {
string id = 1;
string title = 2;
string content = 3;
string author_id = 4;
repeated string tags = 5;
bool published = 6;
google.protobuf.Timestamp created_at = 7;
google.protobuf.Timestamp updated_at = 8;
int32 view_count = 9;
}
message CreatePostRequest {
string title = 1;
string content = 2;
repeated string tags = 3;
string author_id = 4;
}
message GetPostRequest {
string id = 1;
bool include_author = 2; // For version compatibility
}
message ListPostsRequest {
int32 limit = 1;
string cursor = 2; // For pagination
repeated string tags = 3; // Filter by tags
string author_id = 4; // Filter by author
}
message PublishPostRequest {
string id = 1;
google.protobuf.Timestamp publish_at = 2; // Optional scheduled publish
}
message DeletePostRequest {
string id = 1;
bool soft_delete = 2; // Archive instead of delete
}
message PostUpdate {
enum Action {
ACTION_UNSPECIFIED = 0;
CREATED = 1;
UPDATED = 2;
DELETED = 3;
PUBLISHED = 4;
}
string post_id = 1;
Action action = 2;
Post post = 3;
google.protobuf.Timestamp timestamp = 4;
}
message SubscribeRequest {
repeated string tag_filters = 1; // Only updates matching tags
string author_id_filter = 2;
}
// Error responses using rpc error codes
// - NOT_FOUND = 5
// - INVALID_ARGUMENT = 3
// - PERMISSION_DENIED = 7
// - INTERNAL = 13
Generate Node.js code:
# Install protoc compiler
brew install protobuf
# Generate TypeScript/JS code
protoc \
--plugin=protoc-gen-ts_proto=./node_modules/.bin/protoc-gen-ts_proto \
--ts_proto_out=./src/generated \
--ts_proto_opt=outputServices=grpc-js \
blog/v1/post.proto
Node.js gRPC Server with @grpc/grpc-js
Implement service handlers with proper error handling and metadata.
import * as grpc from '@grpc/grpc-js';
import {
PostServiceService,
IPostServiceServer,
} from './generated/blog/v1/post';
import { Post, CreatePostRequest } from './generated/blog/v1/post';
import { Metadata, status } from '@grpc/grpc-js';
class PostServiceImpl implements IPostServiceServer {
constructor(
private db: Database,
private logger: Logger,
private cache: Cache
) {}
async createPost(
call: grpc.ServerUnaryCall<CreatePostRequest, Post>,
callback: grpc.sendUnaryData<Post>
): Promise<void> {
try {
const { title, content, tags, author_id } = call.request;
// Validate
if (!title || title.length < 5) {
return callback({
code: grpc.status.INVALID_ARGUMENT,
message: 'Title must be at least 5 characters',
});
}
if (!content || content.length < 10) {
return callback({
code: grpc.status.INVALID_ARGUMENT,
message: 'Content must be at least 10 characters',
});
}
// Verify author exists
const author = await this.db.user.findById(author_id);
if (!author) {
return callback({
code: grpc.status.NOT_FOUND,
message: `Author ${author_id} not found`,
});
}
// Create post
const post = await this.db.post.create({
id: uuid(),
title,
content,
tags,
author_id,
published: false,
created_at: new Date(),
updated_at: new Date(),
view_count: 0,
});
// Set metadata headers
const metadata = new Metadata();
metadata.add('post-id', post.id);
call.sendMetadata(metadata);
callback(null, post);
} catch (error) {
this.logger.error('createPost failed', error);
callback({
code: grpc.status.INTERNAL,
message: 'Internal server error',
});
}
}
async getPost(
call: grpc.ServerUnaryCall<GetPostRequest, Post>,
callback: grpc.sendUnaryData<Post>
): Promise<void> {
try {
const post = await this.cache.get(`post:${call.request.id}`);
if (post) {
return callback(null, post);
}
const dbPost = await this.db.post.findById(call.request.id);
if (!dbPost) {
return callback({
code: grpc.status.NOT_FOUND,
message: `Post ${call.request.id} not found`,
});
}
// Cache for 1 hour
await this.cache.set(`post:${call.request.id}`, dbPost, 3600);
callback(null, dbPost);
} catch (error) {
this.logger.error('getPost failed', error);
callback({
code: grpc.status.INTERNAL,
message: 'Internal server error',
});
}
}
async listPosts(
call: grpc.ServerWritableStream<ListPostsRequest, Post>
): Promise<void> {
try {
const { limit = 20, cursor, tags, author_id } = call.request;
// Build query
let query = this.db.post.findMany({
where: {} as any,
take: limit + 1,
cursor: cursor ? { id: cursor } : undefined,
orderBy: { created_at: 'desc' },
});
if (author_id) {
query = query.where({ author_id });
}
if (tags && tags.length > 0) {
query = query.where({
tags: { hasSome: tags },
});
}
const posts = await query;
// Stream posts
for (const post of posts.slice(0, limit)) {
call.write(post);
}
// Send status with cursor for next page
if (posts.length > limit) {
const metadata = new Metadata();
metadata.add('x-next-cursor', posts[limit].id);
call.sendMetadata(metadata);
}
call.end();
} catch (error) {
this.logger.error('listPosts failed', error);
call.destroy(error);
}
}
async subscribePostUpdates(
call: grpc.ServerWritableStream<SubscribeRequest, PostUpdate>
): Promise<void> {
const { tag_filters, author_id_filter } = call.request;
const listener = (event: PostEvent) => {
// Filter updates
if (author_id_filter && event.authorId !== author_id_filter) {
return;
}
if (tag_filters?.length > 0) {
const hasMatchingTag = event.tags.some(tag =>
tag_filters.includes(tag)
);
if (!hasMatchingTag) return;
}
// Stream update
call.write({
post_id: event.postId,
action: event.action,
post: event.post,
timestamp: new Date(),
});
};
// Subscribe to events
this.eventBus.on('post:event', listener);
// Clean up on disconnect
call.on('cancelled', () => {
this.eventBus.off('post:event', listener);
});
}
async publishPost(
call: grpc.ServerUnaryCall<PublishPostRequest, Empty>,
callback: grpc.sendUnaryData<Empty>
): Promise<void> {
try {
const post = await this.db.post.findById(call.request.id);
if (!post) {
return callback({
code: grpc.status.NOT_FOUND,
message: `Post not found`,
});
}
if (post.published) {
return callback({
code: grpc.status.FAILED_PRECONDITION,
message: 'Post already published',
});
}
await this.db.post.update(call.request.id, {
published: true,
created_at: call.request.publish_at || new Date(),
});
// Invalidate cache
await this.cache.delete(`post:${call.request.id}`);
callback(null, {});
} catch (error) {
this.logger.error('publishPost failed', error);
callback({
code: grpc.status.INTERNAL,
message: 'Internal server error',
});
}
}
async deletePost(
call: grpc.ServerUnaryCall<DeletePostRequest, Empty>,
callback: grpc.sendUnaryData<Empty>
): Promise<void> {
try {
const post = await this.db.post.findById(call.request.id);
if (!post) {
return callback({
code: grpc.status.NOT_FOUND,
message: 'Post not found',
});
}
if (call.request.soft_delete) {
await this.db.post.update(call.request.id, {
deleted_at: new Date(),
});
} else {
await this.db.post.delete(call.request.id);
}
await this.cache.delete(`post:${call.request.id}`);
callback(null, {});
} catch (error) {
this.logger.error('deletePost failed', error);
callback({
code: grpc.status.INTERNAL,
message: 'Internal server error',
});
}
}
}
// Server setup
const server = new grpc.Server();
const postService = new PostServiceImpl(db, logger, cache);
server.addService(PostServiceService, postService);
server.bindAsync(
'127.0.0.1:50051',
grpc.ServerCredentials.createInsecure(),
(error, port) => {
if (error) {
console.error('Failed to bind:', error);
return;
}
console.log(`Server running at 127.0.0.1:${port}`);
server.start();
}
);
Unary vs Server-Streaming vs Bidirectional
Choose the right communication pattern for each use case.
// 1. Unary: Single request, single response
// Best for: Simple queries, atomic operations
rpc GetPost(GetPostRequest) returns (Post);
// Usage
const response = await client.getPost(request);
// 2. Server-Streaming: Single request, many responses
// Best for: Large result sets, paginated data, subscriptions
rpc ListPosts(ListPostsRequest) returns (stream Post);
// Usage
const call = client.listPosts(request);
call.on('data', (post: Post) => {
console.log('Received post:', post.id);
});
call.on('end', () => {
console.log('Stream ended');
});
call.on('error', (error) => {
console.error('Stream error:', error);
});
// 3. Client-Streaming: Many requests, single response
// Best for: Batch uploads, bulk operations
rpc BulkImportPosts(stream ImportPostRequest) returns (BulkImportResponse);
// Server-side
async bulkImportPosts(
call: grpc.ServerReadableStream<ImportPostRequest>
): Promise<void> {
const imported: string[] = [];
call.on('data', async (request: ImportPostRequest) => {
const post = await this.db.post.create(request);
imported.push(post.id);
});
call.on('end', async () => {
call.end({
imported_count: imported.length,
ids: imported,
});
});
}
// Client-side
const call = client.bulkImportPosts((error, response) => {
if (!error) {
console.log(`Imported ${response.imported_count} posts`);
}
});
call.write({ title: 'Post 1', content: '...' });
call.write({ title: 'Post 2', content: '...' });
call.end();
// 4. Bidirectional Streaming: Many requests, many responses
// Best for: Chat, collaborative editing, real-time sync
rpc SyncPosts(stream PostSyncRequest) returns (stream PostSyncResponse);
// Server-side
async syncPosts(
call: grpc.ServerDuplexStream<PostSyncRequest, PostSyncResponse>
): Promise<void> {
call.on('data', async (request: PostSyncRequest) => {
const changes = await this.getChangesSince(request.last_sync_at);
for (const change of changes) {
call.write({
action: change.type,
post: change.post,
timestamp: change.timestamp,
});
}
});
call.on('end', () => {
call.end();
});
}
// Client-side
const call = client.syncPosts();
call.on('data', (response: PostSyncResponse) => {
console.log(`Received ${response.action} for post ${response.post.id}`);
});
// Send sync requests periodically
setInterval(() => {
call.write({
last_sync_at: lastSyncTime,
});
}, 5000);
Interceptors for Auth and Logging
Add cross-cutting concerns without modifying service implementations.
import * as grpc from '@grpc/grpc-js';
// Auth interceptor
const authInterceptor: grpc.ServerMiddleware = (
methodDescriptor,
call,
next
) => {
// Skip auth for health check
if (
call.getMethod() === '/grpc.health.v1.Health/Check' ||
call.getMethod() === '/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo'
) {
return next(call);
}
const metadata = call.metadata;
const token = metadata.get('authorization')?.[0];
if (!token) {
call.sendMetadata(new grpc.Metadata());
const trailers = new grpc.Metadata();
trailers.add('grpc-message', 'Missing authorization token');
call.sendMetadata(trailers);
call.destroy(new Error('UNAUTHENTICATED'));
return;
}
try {
const decoded = verifyJWT(String(token).replace('Bearer ', ''));
// Store user info for handler
call.user = decoded;
} catch {
call.destroy(new Error('UNAUTHENTICATED'));
return;
}
return next(call);
};
// Logging interceptor
const loggingInterceptor: grpc.ServerMiddleware = (
methodDescriptor,
call,
next
) => {
const startTime = Date.now();
const method = call.getMethod();
console.log(`[${new Date().toISOString()}] gRPC call: ${method}`);
const originalEnd = call.end.bind(call);
call.end = function(this: any) {
const duration = Date.now() - startTime;
console.log(`[${method}] completed in ${duration}ms`);
return originalEnd.apply(this, arguments);
};
return next(call);
};
// Error handling interceptor
const errorInterceptor: grpc.ServerMiddleware = (
methodDescriptor,
call,
next
) => {
try {
return next(call);
} catch (error) {
console.error('Unhandled error in gRPC call:', error);
const grpcError = {
code: grpc.status.INTERNAL,
message: 'Internal server error',
};
if (error instanceof ValidationError) {
grpcError.code = grpc.status.INVALID_ARGUMENT;
grpcError.message = error.message;
}
if (error instanceof NotFoundError) {
grpcError.code = grpc.status.NOT_FOUND;
grpcError.message = error.message;
}
call.destroy(grpcError);
}
};
// Create server with interceptors
const server = new grpc.Server({
interceptors: [authInterceptor, loggingInterceptor, errorInterceptor],
});
server.addService(PostServiceService, postService);
gRPC-Gateway for REST Compatibility
Serve both gRPC and REST from the same binary.
// blog/v1/post.proto - add annotations
syntax = "proto3";
import "google/api/annotations.proto";
service PostService {
rpc CreatePost(CreatePostRequest) returns (Post) {
option (google.api.http) = {
post: "/v1/posts"
body: "*"
};
};
rpc GetPost(GetPostRequest) returns (Post) {
option (google.api.http) = {
get: "/v1/posts/{id}"
};
};
rpc ListPosts(ListPostsRequest) returns (stream Post) {
option (google.api.http) = {
get: "/v1/posts"
};
};
rpc PublishPost(PublishPostRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
post: "/v1/posts/{id}:publish"
body: "*"
};
};
rpc DeletePost(DeletePostRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
delete: "/v1/posts/{id}"
};
};
}
Generate gateway code:
protoc \
--plugin=protoc-gen-grpc-gateway \
--grpc-gateway_out=logtostderr=true:./src/generated \
--grpc-gateway_opt=paths=source_relative \
blog/v1/post.proto
Start both listeners:
import * as grpc from '@grpc/grpc-js';
import express from 'express';
import { createGrpcGateway } from '@grpc-ecosystem/grpc-gateway';
// gRPC server on :50051
const grpcServer = new grpc.Server();
grpcServer.addService(PostServiceService, postService);
grpcServer.bindAsync(
'0.0.0.0:50051',
grpc.ServerCredentials.createInsecure(),
() => {
console.log('gRPC server listening on :50051');
grpcServer.start();
}
);
// REST gateway on :8080 proxies to gRPC
const app = express();
const gateway = createGrpcGateway(
[PostServiceService],
'127.0.0.1:50051',
grpc.credentials.createInsecure()
);
app.use(gateway);
app.listen(8080, () => {
console.log('REST gateway listening on :8080');
});
// Now supports both:
// gRPC: grpcurl -plaintext 127.0.0.1:50051 blog.v1.PostService/GetPost
// REST: curl http://localhost:8080/v1/posts/123
Health Checking and Reflection
Implement health checks for load balancers and reflection for debugging.
import * as grpc from '@grpc/grpc-js';
import {
HealthService,
HealthCheckRequest,
HealthCheckResponse,
} from './generated/grpc/health/v1/health';
// Health check service
class HealthServiceImpl {
async check(
request: HealthCheckRequest,
callback: grpc.sendUnaryData<HealthCheckResponse>
): Promise<void> {
const service = request.service || 'blog.v1.PostService';
// Check database connectivity
try {
await db.query('SELECT 1');
} catch {
return callback(null, {
status: HealthCheckResponse.ServingStatus.NOT_SERVING,
});
}
// Check cache connectivity
try {
await cache.ping();
} catch {
return callback(null, {
status: HealthCheckResponse.ServingStatus.NOT_SERVING,
});
}
callback(null, {
status: HealthCheckResponse.ServingStatus.SERVING,
});
}
async watch(
call: grpc.ServerWritableStream<HealthCheckRequest, HealthCheckResponse>
): Promise<void> {
// Stream health status updates periodically
const interval = setInterval(async () => {
const status = await this.getServiceStatus();
call.write({ status });
}, 1000);
call.on('cancelled', () => {
clearInterval(interval);
call.end();
});
}
private async getServiceStatus(): Promise<HealthCheckResponse.ServingStatus> {
const isHealthy = await db.query('SELECT 1').then(() => true).catch(() => false);
return isHealthy
? HealthCheckResponse.ServingStatus.SERVING
: HealthCheckResponse.ServingStatus.NOT_SERVING;
}
}
server.addService(HealthService, new HealthServiceImpl());
// Reflection for grpcurl debugging
import { createReflectionService } from '@grpc/grpc-js-reflection';
createReflectionService(server, {
grpc: {
version: '1.0.0',
},
services: [PostServiceService],
});
// Test with grpcurl:
// grpcurl -plaintext list 127.0.0.1:50051
// grpcurl -plaintext describe 127.0.0.1:50051 blog.v1.PostService
// grpcurl -plaintext -d '{"id":"post-123"}' 127.0.0.1:50051 blog.v1.PostService/GetPost
gRPC Production Checklist
- Protocol buffers versioned, backward/forward compatible
- All RPC methods have proper error codes (NOT_FOUND, INVALID_ARGUMENT, etc)
- Interceptors handle auth, logging, error translation
- Timeouts set on all client calls
- Health check endpoint for load balancer integration
- Reflection enabled for debugging without code
- Metrics tracked (latency, error rate, request count)
- Load balancing strategy chosen (round-robin, least-request)
- TLS enabled in production (not just localhost)
- Streaming calls properly handle cancellation and cleanup
Conclusion
gRPC provides high-performance service-to-service communication with Protocol Buffers for schema versioning. Start with unary RPCs, add streaming for bulk operations, use interceptors for cross-cutting concerns, and expose both gRPC and REST endpoints via gRPC-Gateway. Monitor health checks, enable reflection for debugging, and enforce proper error handling across all services.