Published on

gRPC in Node.js — Protocol Buffers, Streaming, and Service-to-Service Communication

Authors

Introduction

gRPC is a high-performance RPC framework using Protocol Buffers for serialization and HTTP/2 for transport. Unlike REST APIs, gRPC offers streaming, multiplexing, and binary efficiency. Protocol Buffers provide schema versioning and language-agnostic contracts. This post covers defining protobuf schemas, implementing servers, unary vs streaming patterns, interceptors for cross-cutting concerns, grpc-gateway for REST compatibility, health checks, and reflection APIs for debugging.

Protocol Buffer Schema Design

Define message types and services in .proto files. Protocol Buffers are versioned and backward/forward compatible.

// blog/v1/post.proto
syntax = "proto3";

package blog.v1;

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// Service definition
service PostService {
  rpc CreatePost(CreatePostRequest) returns (Post);
  rpc GetPost(GetPostRequest) returns (Post);
  rpc ListPosts(ListPostsRequest) returns (stream Post);
  rpc SubscribePostUpdates(SubscribeRequest) returns (stream PostUpdate);
  rpc PublishPost(PublishPostRequest) returns (google.protobuf.Empty);
  rpc DeletePost(DeletePostRequest) returns (google.protobuf.Empty);
}

// Messages
message Post {
  string id = 1;
  string title = 2;
  string content = 3;
  string author_id = 4;
  repeated string tags = 5;
  bool published = 6;
  google.protobuf.Timestamp created_at = 7;
  google.protobuf.Timestamp updated_at = 8;
  int32 view_count = 9;
}

message CreatePostRequest {
  string title = 1;
  string content = 2;
  repeated string tags = 3;
  string author_id = 4;
}

message GetPostRequest {
  string id = 1;
  bool include_author = 2; // For version compatibility
}

message ListPostsRequest {
  int32 limit = 1;
  string cursor = 2; // For pagination
  repeated string tags = 3; // Filter by tags
  string author_id = 4; // Filter by author
}

message PublishPostRequest {
  string id = 1;
  google.protobuf.Timestamp publish_at = 2; // Optional scheduled publish
}

message DeletePostRequest {
  string id = 1;
  bool soft_delete = 2; // Archive instead of delete
}

message PostUpdate {
  enum Action {
    ACTION_UNSPECIFIED = 0;
    CREATED = 1;
    UPDATED = 2;
    DELETED = 3;
    PUBLISHED = 4;
  }

  string post_id = 1;
  Action action = 2;
  Post post = 3;
  google.protobuf.Timestamp timestamp = 4;
}

message SubscribeRequest {
  repeated string tag_filters = 1; // Only updates matching tags
  string author_id_filter = 2;
}

// Error responses using rpc error codes
// - NOT_FOUND = 5
// - INVALID_ARGUMENT = 3
// - PERMISSION_DENIED = 7
// - INTERNAL = 13

Generate Node.js code:

# Install protoc compiler
brew install protobuf

# Generate TypeScript/JS code
protoc \
  --plugin=protoc-gen-ts_proto=./node_modules/.bin/protoc-gen-ts_proto \
  --ts_proto_out=./src/generated \
  --ts_proto_opt=outputServices=grpc-js \
  blog/v1/post.proto

Node.js gRPC Server with @grpc/grpc-js

Implement service handlers with proper error handling and metadata.

import * as grpc from '@grpc/grpc-js';
import {
  PostServiceService,
  IPostServiceServer,
} from './generated/blog/v1/post';
import { Post, CreatePostRequest } from './generated/blog/v1/post';
import { Metadata, status } from '@grpc/grpc-js';

class PostServiceImpl implements IPostServiceServer {
  constructor(
    private db: Database,
    private logger: Logger,
    private cache: Cache
  ) {}

  async createPost(
    call: grpc.ServerUnaryCall<CreatePostRequest, Post>,
    callback: grpc.sendUnaryData<Post>
  ): Promise<void> {
    try {
      const { title, content, tags, author_id } = call.request;

      // Validate
      if (!title || title.length < 5) {
        return callback({
          code: grpc.status.INVALID_ARGUMENT,
          message: 'Title must be at least 5 characters',
        });
      }

      if (!content || content.length < 10) {
        return callback({
          code: grpc.status.INVALID_ARGUMENT,
          message: 'Content must be at least 10 characters',
        });
      }

      // Verify author exists
      const author = await this.db.user.findById(author_id);
      if (!author) {
        return callback({
          code: grpc.status.NOT_FOUND,
          message: `Author ${author_id} not found`,
        });
      }

      // Create post
      const post = await this.db.post.create({
        id: uuid(),
        title,
        content,
        tags,
        author_id,
        published: false,
        created_at: new Date(),
        updated_at: new Date(),
        view_count: 0,
      });

      // Set metadata headers
      const metadata = new Metadata();
      metadata.add('post-id', post.id);
      call.sendMetadata(metadata);

      callback(null, post);
    } catch (error) {
      this.logger.error('createPost failed', error);
      callback({
        code: grpc.status.INTERNAL,
        message: 'Internal server error',
      });
    }
  }

  async getPost(
    call: grpc.ServerUnaryCall<GetPostRequest, Post>,
    callback: grpc.sendUnaryData<Post>
  ): Promise<void> {
    try {
      const post = await this.cache.get(`post:${call.request.id}`);

      if (post) {
        return callback(null, post);
      }

      const dbPost = await this.db.post.findById(call.request.id);

      if (!dbPost) {
        return callback({
          code: grpc.status.NOT_FOUND,
          message: `Post ${call.request.id} not found`,
        });
      }

      // Cache for 1 hour
      await this.cache.set(`post:${call.request.id}`, dbPost, 3600);

      callback(null, dbPost);
    } catch (error) {
      this.logger.error('getPost failed', error);
      callback({
        code: grpc.status.INTERNAL,
        message: 'Internal server error',
      });
    }
  }

  async listPosts(
    call: grpc.ServerWritableStream<ListPostsRequest, Post>
  ): Promise<void> {
    try {
      const { limit = 20, cursor, tags, author_id } = call.request;

      // Build query
      let query = this.db.post.findMany({
        where: {} as any,
        take: limit + 1,
        cursor: cursor ? { id: cursor } : undefined,
        orderBy: { created_at: 'desc' },
      });

      if (author_id) {
        query = query.where({ author_id });
      }

      if (tags && tags.length > 0) {
        query = query.where({
          tags: { hasSome: tags },
        });
      }

      const posts = await query;

      // Stream posts
      for (const post of posts.slice(0, limit)) {
        call.write(post);
      }

      // Send status with cursor for next page
      if (posts.length > limit) {
        const metadata = new Metadata();
        metadata.add('x-next-cursor', posts[limit].id);
        call.sendMetadata(metadata);
      }

      call.end();
    } catch (error) {
      this.logger.error('listPosts failed', error);
      call.destroy(error);
    }
  }

  async subscribePostUpdates(
    call: grpc.ServerWritableStream<SubscribeRequest, PostUpdate>
  ): Promise<void> {
    const { tag_filters, author_id_filter } = call.request;

    const listener = (event: PostEvent) => {
      // Filter updates
      if (author_id_filter && event.authorId !== author_id_filter) {
        return;
      }

      if (tag_filters?.length > 0) {
        const hasMatchingTag = event.tags.some(tag =>
          tag_filters.includes(tag)
        );
        if (!hasMatchingTag) return;
      }

      // Stream update
      call.write({
        post_id: event.postId,
        action: event.action,
        post: event.post,
        timestamp: new Date(),
      });
    };

    // Subscribe to events
    this.eventBus.on('post:event', listener);

    // Clean up on disconnect
    call.on('cancelled', () => {
      this.eventBus.off('post:event', listener);
    });
  }

  async publishPost(
    call: grpc.ServerUnaryCall<PublishPostRequest, Empty>,
    callback: grpc.sendUnaryData<Empty>
  ): Promise<void> {
    try {
      const post = await this.db.post.findById(call.request.id);

      if (!post) {
        return callback({
          code: grpc.status.NOT_FOUND,
          message: `Post not found`,
        });
      }

      if (post.published) {
        return callback({
          code: grpc.status.FAILED_PRECONDITION,
          message: 'Post already published',
        });
      }

      await this.db.post.update(call.request.id, {
        published: true,
        created_at: call.request.publish_at || new Date(),
      });

      // Invalidate cache
      await this.cache.delete(`post:${call.request.id}`);

      callback(null, {});
    } catch (error) {
      this.logger.error('publishPost failed', error);
      callback({
        code: grpc.status.INTERNAL,
        message: 'Internal server error',
      });
    }
  }

  async deletePost(
    call: grpc.ServerUnaryCall<DeletePostRequest, Empty>,
    callback: grpc.sendUnaryData<Empty>
  ): Promise<void> {
    try {
      const post = await this.db.post.findById(call.request.id);

      if (!post) {
        return callback({
          code: grpc.status.NOT_FOUND,
          message: 'Post not found',
        });
      }

      if (call.request.soft_delete) {
        await this.db.post.update(call.request.id, {
          deleted_at: new Date(),
        });
      } else {
        await this.db.post.delete(call.request.id);
      }

      await this.cache.delete(`post:${call.request.id}`);

      callback(null, {});
    } catch (error) {
      this.logger.error('deletePost failed', error);
      callback({
        code: grpc.status.INTERNAL,
        message: 'Internal server error',
      });
    }
  }
}

// Server setup
const server = new grpc.Server();

const postService = new PostServiceImpl(db, logger, cache);
server.addService(PostServiceService, postService);

server.bindAsync(
  '127.0.0.1:50051',
  grpc.ServerCredentials.createInsecure(),
  (error, port) => {
    if (error) {
      console.error('Failed to bind:', error);
      return;
    }
    console.log(`Server running at 127.0.0.1:${port}`);
    server.start();
  }
);

Unary vs Server-Streaming vs Bidirectional

Choose the right communication pattern for each use case.

// 1. Unary: Single request, single response
// Best for: Simple queries, atomic operations
rpc GetPost(GetPostRequest) returns (Post);

// Usage
const response = await client.getPost(request);

// 2. Server-Streaming: Single request, many responses
// Best for: Large result sets, paginated data, subscriptions
rpc ListPosts(ListPostsRequest) returns (stream Post);

// Usage
const call = client.listPosts(request);
call.on('data', (post: Post) => {
  console.log('Received post:', post.id);
});
call.on('end', () => {
  console.log('Stream ended');
});
call.on('error', (error) => {
  console.error('Stream error:', error);
});

// 3. Client-Streaming: Many requests, single response
// Best for: Batch uploads, bulk operations
rpc BulkImportPosts(stream ImportPostRequest) returns (BulkImportResponse);

// Server-side
async bulkImportPosts(
  call: grpc.ServerReadableStream<ImportPostRequest>
): Promise<void> {
  const imported: string[] = [];

  call.on('data', async (request: ImportPostRequest) => {
    const post = await this.db.post.create(request);
    imported.push(post.id);
  });

  call.on('end', async () => {
    call.end({
      imported_count: imported.length,
      ids: imported,
    });
  });
}

// Client-side
const call = client.bulkImportPosts((error, response) => {
  if (!error) {
    console.log(`Imported ${response.imported_count} posts`);
  }
});

call.write({ title: 'Post 1', content: '...' });
call.write({ title: 'Post 2', content: '...' });
call.end();

// 4. Bidirectional Streaming: Many requests, many responses
// Best for: Chat, collaborative editing, real-time sync
rpc SyncPosts(stream PostSyncRequest) returns (stream PostSyncResponse);

// Server-side
async syncPosts(
  call: grpc.ServerDuplexStream<PostSyncRequest, PostSyncResponse>
): Promise<void> {
  call.on('data', async (request: PostSyncRequest) => {
    const changes = await this.getChangesSince(request.last_sync_at);

    for (const change of changes) {
      call.write({
        action: change.type,
        post: change.post,
        timestamp: change.timestamp,
      });
    }
  });

  call.on('end', () => {
    call.end();
  });
}

// Client-side
const call = client.syncPosts();

call.on('data', (response: PostSyncResponse) => {
  console.log(`Received ${response.action} for post ${response.post.id}`);
});

// Send sync requests periodically
setInterval(() => {
  call.write({
    last_sync_at: lastSyncTime,
  });
}, 5000);

Interceptors for Auth and Logging

Add cross-cutting concerns without modifying service implementations.

import * as grpc from '@grpc/grpc-js';

// Auth interceptor
const authInterceptor: grpc.ServerMiddleware = (
  methodDescriptor,
  call,
  next
) => {
  // Skip auth for health check
  if (
    call.getMethod() === '/grpc.health.v1.Health/Check' ||
    call.getMethod() === '/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo'
  ) {
    return next(call);
  }

  const metadata = call.metadata;
  const token = metadata.get('authorization')?.[0];

  if (!token) {
    call.sendMetadata(new grpc.Metadata());
    const trailers = new grpc.Metadata();
    trailers.add('grpc-message', 'Missing authorization token');
    call.sendMetadata(trailers);
    call.destroy(new Error('UNAUTHENTICATED'));
    return;
  }

  try {
    const decoded = verifyJWT(String(token).replace('Bearer ', ''));
    // Store user info for handler
    call.user = decoded;
  } catch {
    call.destroy(new Error('UNAUTHENTICATED'));
    return;
  }

  return next(call);
};

// Logging interceptor
const loggingInterceptor: grpc.ServerMiddleware = (
  methodDescriptor,
  call,
  next
) => {
  const startTime = Date.now();
  const method = call.getMethod();

  console.log(`[${new Date().toISOString()}] gRPC call: ${method}`);

  const originalEnd = call.end.bind(call);
  call.end = function(this: any) {
    const duration = Date.now() - startTime;
    console.log(`[${method}] completed in ${duration}ms`);
    return originalEnd.apply(this, arguments);
  };

  return next(call);
};

// Error handling interceptor
const errorInterceptor: grpc.ServerMiddleware = (
  methodDescriptor,
  call,
  next
) => {
  try {
    return next(call);
  } catch (error) {
    console.error('Unhandled error in gRPC call:', error);

    const grpcError = {
      code: grpc.status.INTERNAL,
      message: 'Internal server error',
    };

    if (error instanceof ValidationError) {
      grpcError.code = grpc.status.INVALID_ARGUMENT;
      grpcError.message = error.message;
    }

    if (error instanceof NotFoundError) {
      grpcError.code = grpc.status.NOT_FOUND;
      grpcError.message = error.message;
    }

    call.destroy(grpcError);
  }
};

// Create server with interceptors
const server = new grpc.Server({
  interceptors: [authInterceptor, loggingInterceptor, errorInterceptor],
});

server.addService(PostServiceService, postService);

gRPC-Gateway for REST Compatibility

Serve both gRPC and REST from the same binary.

// blog/v1/post.proto - add annotations
syntax = "proto3";

import "google/api/annotations.proto";

service PostService {
  rpc CreatePost(CreatePostRequest) returns (Post) {
    option (google.api.http) = {
      post: "/v1/posts"
      body: "*"
    };
  };

  rpc GetPost(GetPostRequest) returns (Post) {
    option (google.api.http) = {
      get: "/v1/posts/{id}"
    };
  };

  rpc ListPosts(ListPostsRequest) returns (stream Post) {
    option (google.api.http) = {
      get: "/v1/posts"
    };
  };

  rpc PublishPost(PublishPostRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/v1/posts/{id}:publish"
      body: "*"
    };
  };

  rpc DeletePost(DeletePostRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      delete: "/v1/posts/{id}"
    };
  };
}

Generate gateway code:

protoc \
  --plugin=protoc-gen-grpc-gateway \
  --grpc-gateway_out=logtostderr=true:./src/generated \
  --grpc-gateway_opt=paths=source_relative \
  blog/v1/post.proto

Start both listeners:

import * as grpc from '@grpc/grpc-js';
import express from 'express';
import { createGrpcGateway } from '@grpc-ecosystem/grpc-gateway';

// gRPC server on :50051
const grpcServer = new grpc.Server();
grpcServer.addService(PostServiceService, postService);
grpcServer.bindAsync(
  '0.0.0.0:50051',
  grpc.ServerCredentials.createInsecure(),
  () => {
    console.log('gRPC server listening on :50051');
    grpcServer.start();
  }
);

// REST gateway on :8080 proxies to gRPC
const app = express();
const gateway = createGrpcGateway(
  [PostServiceService],
  '127.0.0.1:50051',
  grpc.credentials.createInsecure()
);

app.use(gateway);
app.listen(8080, () => {
  console.log('REST gateway listening on :8080');
});

// Now supports both:
// gRPC: grpcurl -plaintext 127.0.0.1:50051 blog.v1.PostService/GetPost
// REST: curl http://localhost:8080/v1/posts/123

Health Checking and Reflection

Implement health checks for load balancers and reflection for debugging.

import * as grpc from '@grpc/grpc-js';
import {
  HealthService,
  HealthCheckRequest,
  HealthCheckResponse,
} from './generated/grpc/health/v1/health';

// Health check service
class HealthServiceImpl {
  async check(
    request: HealthCheckRequest,
    callback: grpc.sendUnaryData<HealthCheckResponse>
  ): Promise<void> {
    const service = request.service || 'blog.v1.PostService';

    // Check database connectivity
    try {
      await db.query('SELECT 1');
    } catch {
      return callback(null, {
        status: HealthCheckResponse.ServingStatus.NOT_SERVING,
      });
    }

    // Check cache connectivity
    try {
      await cache.ping();
    } catch {
      return callback(null, {
        status: HealthCheckResponse.ServingStatus.NOT_SERVING,
      });
    }

    callback(null, {
      status: HealthCheckResponse.ServingStatus.SERVING,
    });
  }

  async watch(
    call: grpc.ServerWritableStream<HealthCheckRequest, HealthCheckResponse>
  ): Promise<void> {
    // Stream health status updates periodically
    const interval = setInterval(async () => {
      const status = await this.getServiceStatus();
      call.write({ status });
    }, 1000);

    call.on('cancelled', () => {
      clearInterval(interval);
      call.end();
    });
  }

  private async getServiceStatus(): Promise<HealthCheckResponse.ServingStatus> {
    const isHealthy = await db.query('SELECT 1').then(() => true).catch(() => false);
    return isHealthy
      ? HealthCheckResponse.ServingStatus.SERVING
      : HealthCheckResponse.ServingStatus.NOT_SERVING;
  }
}

server.addService(HealthService, new HealthServiceImpl());

// Reflection for grpcurl debugging
import { createReflectionService } from '@grpc/grpc-js-reflection';

createReflectionService(server, {
  grpc: {
    version: '1.0.0',
  },
  services: [PostServiceService],
});

// Test with grpcurl:
// grpcurl -plaintext list 127.0.0.1:50051
// grpcurl -plaintext describe 127.0.0.1:50051 blog.v1.PostService
// grpcurl -plaintext -d '{"id":"post-123"}' 127.0.0.1:50051 blog.v1.PostService/GetPost

gRPC Production Checklist

  • Protocol buffers versioned, backward/forward compatible
  • All RPC methods have proper error codes (NOT_FOUND, INVALID_ARGUMENT, etc)
  • Interceptors handle auth, logging, error translation
  • Timeouts set on all client calls
  • Health check endpoint for load balancer integration
  • Reflection enabled for debugging without code
  • Metrics tracked (latency, error rate, request count)
  • Load balancing strategy chosen (round-robin, least-request)
  • TLS enabled in production (not just localhost)
  • Streaming calls properly handle cancellation and cleanup

Conclusion

gRPC provides high-performance service-to-service communication with Protocol Buffers for schema versioning. Start with unary RPCs, add streaming for bulk operations, use interceptors for cross-cutting concerns, and expose both gRPC and REST endpoints via gRPC-Gateway. Monitor health checks, enable reflection for debugging, and enforce proper error handling across all services.