grpc.cr

Test Lines of Code

📡 gRPC for Crystal built on libnghttp2.

This project depends on the published proto shard for:

grpc.cr focuses on gRPC transport, service/client stubs, streaming, and interceptors.

Installation

dependencies:
  grpc:
    github: kojix2/grpc

Install system dependency (libnghttp2):

brew install nghttp2          # macOS
apt install libnghttp2-dev    # Debian/Ubuntu
pacman -S nghttp2             # Arch

Install shards:

shards install

Testing

Default suite:

crystal spec

grpcurl E2E entry point:

crystal spec spec/grpcurl.cr

GripMock E2E entry point (requires GripMock at 127.0.0.1:4770 and admin API at 127.0.0.1:4771):

crystal spec spec/gripmock.cr

Code Generation

grpc.cr uses two protoc plugins:

1) Build both plugins

# protobuf plugin from the installed proto shard dependency
crystal build lib/proto/src/protoc-gen-crystal_main.cr -o bin/protoc-gen-crystal

# gRPC plugin from this repository
crystal build src/protoc-gen-crystal-grpc_main.cr -o bin/protoc-gen-crystal-grpc

2) Generate protobuf types first (.pb.cr)

protoc --plugin=protoc-gen-crystal=bin/protoc-gen-crystal \
  --crystal_out=. helloworld.proto

3) Generate gRPC stubs (.grpc.cr)

protoc --plugin=protoc-gen-crystal-grpc=bin/protoc-gen-crystal-grpc \
  --crystal-grpc_out=. helloworld.proto

protoc-gen-crystal-grpc expects message types generated by protoc-gen-crystal.

Require order in application code:

require "grpc"
require "./helloworld.pb.cr"
require "./helloworld.grpc.cr"

Generated .grpc.cr code reopens the service namespace and places metadata, server base class, and typed client together:

module Helloworld
  module Greeter
    FULL_NAME = "helloworld.Greeter"

    abstract class Service < GRPC::Service
    end

    class Client
    end
  end
end

Unary Example

Server:

require "grpc"

class GreeterImpl < Helloworld::Greeter::Service
  def say_hello(req : Helloworld::HelloRequest, ctx : GRPC::ServerContext) : Helloworld::HelloReply
    Helloworld::HelloReply.new(message: "Hello, #{req.name}!")
  end
end

server = GRPC::Server.new
server.handle GreeterImpl.new
server.listen "0.0.0.0", 50051

Client:

require "grpc"

channel = GRPC::Channel.new("localhost:50051")
client  = Helloworld::Greeter::Client.new(channel)
reply   = client.say_hello(Helloworld::HelloRequest.new(name: "Alice"))
puts reply.message
channel.close

Health Checking

enable_health_checking registers the built-in grpc.health.v1.Health service and returns a reporter for status updates.

server = GRPC::Server.new
reporter = server.enable_health_checking

reporter.set_status("", Grpc::Health::V1::HealthCheckResponse::ServingStatus::SERVING)
reporter.set_status(Helloworld::Greeter::FULL_NAME, Grpc::Health::V1::HealthCheckResponse::ServingStatus::SERVING)

Metadata, Deadlines, and Errors

Client context with metadata and deadline:

ctx = GRPC::ClientContext.new(
  metadata: {"authorization" => "Bearer token"},
  deadline: 5.seconds,
)
reply = client.say_hello(req, ctx: ctx)

Error handling:

begin
  reply = client.say_hello(req)
rescue ex : GRPC::StatusError
  puts ex.code
  puts ex.message
end

Server-side status error:

def say_hello(req, ctx)
  raise GRPC::StatusError.new(GRPC::StatusCode::NOT_FOUND, "user not found")
end

Streaming

Generated stubs expose typed APIs for all four RPC styles.

# Server streaming
stream = client.range(Numbers::Number.new(5))
stream.each { |num| puts num.value }
puts stream.status

# Client streaming
call = client.sum
(1..5).each { |i| call.send(Numbers::Number.new(i)) }
total = call.close_and_recv

# Bidirectional streaming
bidi = client.transform
[2, 3, 4].each { |i| bidi.send(Numbers::Number.new(i)) }
bidi.close_send
bidi.each { |num| puts num.value }

Cancel a stream:

stream = client.range(req)
stream.cancel

TLS

# Server
server = GRPC::Server.new
server.use_tls(cert: "server.crt", key: "server.key")

# Client with system CA
channel = GRPC::Channel.new("https://host:50443")

# Client with custom context
ctx = OpenSSL::SSL::Context::Client.new
ctx.ca_certificates = "ca.crt"
channel = GRPC::Channel.new("https://host:50443", tls_context: ctx)

Interceptors

class LoggingInterceptor < GRPC::ServerInterceptor
  def call(request : GRPC::RequestEnvelope,
           ctx : GRPC::ServerContext,
           next_call : GRPC::UnaryServerCall) : GRPC::ResponseEnvelope
    STDERR.puts "-> #{request.info.method_path}"
    next_call.call(request.info.method_path, request, ctx)
  end
end

server.intercept LoggingInterceptor.new
class AuthInterceptor < GRPC::ClientInterceptor
  def call(request : GRPC::RequestEnvelope,
           ctx : GRPC::ClientContext,
           next_call : GRPC::UnaryClientCall) : GRPC::ResponseEnvelope
    ctx.metadata.set("authorization", "Bearer #{@token}")
    next_call.call(request.info.method_path, request, ctx)
  end
end

channel = GRPC::Channel.new("localhost:50051",
  interceptors: [AuthInterceptor.new(token)] of GRPC::ClientInterceptor)

RequestEnvelope includes #raw, #info, and #decode(T).

Channel Configuration

EndpointConfig groups transport and lifecycle options.

config = GRPC::EndpointConfig.new(
  connect_timeout:   5.seconds,
  tcp_keepalive:     30.seconds,
  concurrency_limit: 20,
  rate_limit:        GRPC::RateLimitConfig.new(200_u64, 1.second),
)
channel = GRPC::Channel.new("https://api.example.com:443", endpoint_config: config)

Transport Factory Injection

Channel and Server accept factory procs to swap transport implementations.

client_factory = ->(host : String, port : Int32, use_tls : Bool,
                    tls_ctx : OpenSSL::SSL::Context::Client?,
                    config : GRPC::EndpointConfig) {
  GRPC::Transport::Http2ClientConnection.new(host, port, use_tls, tls_ctx, config)
    .as(GRPC::Transport::ClientTransport)
}

server_factory = ->(io : IO,
                    services : Hash(String, GRPC::Service),
                    interceptors : Array(GRPC::ServerInterceptor),
                    peer : String,
                    tls_sock : OpenSSL::SSL::Socket::Server?) {
  GRPC::Transport::Http2ServerConnection.new(io, services, interceptors, peer, tls_sock)
    .as(GRPC::Transport::ServerTransport)
}

channel = GRPC::Channel.new("localhost:50051", transport_factory: client_factory)
server = GRPC::Server.new(transport_factory: server_factory)

Examples

Unary:

crystal run examples/helloworld/server.cr
crystal run examples/helloworld/client.cr -- localhost 50051 Alice

Streaming:

crystal run examples/streaming/server.cr
crystal run examples/streaming/client.cr

Interceptors:

crystal run examples/interceptors/server.cr
crystal run examples/interceptors/client.cr -- localhost 50053 Alice secret

TLS:

openssl req -x509 -newkey rsa:2048 -keyout server.key -out server.crt \
  -days 365 -nodes -subj "/CN=localhost"

crystal run examples/tls/server.cr -- server.crt server.key
crystal run examples/tls/client.cr -- localhost 50443 Alice

License

MIT