Skip to content

Commit 26958d4

Browse files
sleipnirAdriano Santospolvalente
authored
[FIX] Virtual channel creation with non resolvable address (#450)
* chore: release new version * initial client lb implementation * chore: added tests and better interfaces * Update lib/grpc/client/resolver.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * chore: add lb * chore: adjustments in behaviors & create Conn module * chore: simplifying the test * chore: adjust in tests * feat: make lb work * refact: remove deprecated feature * fix: return all the arities of the connect function * chore: added deprecated * update benchmark * interop tests * fix: parallel execution * fix: correct stop * chore: correct refresh logic * chore: added some documentation * Update lib/grpc/client/load_balacing/round_robin.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/client/load_balacing/round_robin.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * accept string * use string * Update lib/grpc/client/resolver/ipv4.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/client/resolver/ipv6.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * refactor: more readable code * fix: correct code and fix tests * Update lib/grpc/client/conn.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/client/conn.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * refactor: rename and minor adjustments * ref: rename Conn to Connection * ref: Adjusts in function declaration order * Update lib/grpc/client/connection.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/client/connection.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/client/connection.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * fix: translate to english * accept indeterminism * fix: pid is always pid * fix: correct references * remove warning * feat: added client supervisor * fix: charlist errors * chore: add stream operators documentation * feat: added load balancing documentation * chore: added comment about endpoint refresh * fix: address virtual channel key * doc: added client supervisor example * refactor: better validations * fix: no case clause * fix: handle dns no address * chore: raise runtime error if not started client supervisor * Atualizar o connection.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Atualizar o connection.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> --------- Co-authored-by: Adriano Santos <adriano.santos@v3.com.br> Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com>
1 parent 0f7b9ef commit 26958d4

43 files changed

Lines changed: 1778 additions & 299 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 136 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@
1919
- [Bidirectional Streaming](#bidirectional-streaming)
2020
- [Application Startup](#application-startup)
2121
- [Client Usage](#client-usage)
22+
- [Basic Connection and RPC](#basic-connection-and-rpc)
23+
- [Using Interceptors](#using-interceptors)
24+
- [Target Schemes and Resolvers](#target-schemes-and-resolvers)
25+
- [Supported formats](#supported-formats)
26+
- [Example (DNS)](#example-dns)
27+
- [Example (Unix socket)](#example-unix-socket)
28+
- [Compression and Metadata](#compression-and-metadata)
29+
- [Client Adapters](#client-adapters)
30+
- [Using Mint Adapter](#using-mint-adapter)
2231
- [HTTP Transcoding](#http-transcoding)
2332
- [CORS](#cors)
2433
- [Features](#features)
@@ -78,6 +87,8 @@ protoc --elixir_out=plugins=grpc:./lib -I./priv/protos helloworld.proto
7887

7988
All RPC calls must be implemented using the stream-based API, even for unary requests.
8089

90+
>__NOTE__: The old API was deprecated based on `GRPC.Server.send_reply/2` and direct `struct` returns was deprecated as of version `0.10.x`.
91+
8192
### Unary RPC using Stream API
8293

8394
```elixir
@@ -133,7 +144,30 @@ def say_bid_stream_hello(request, materializer) do
133144
|> GRPC.Stream.run_with(materializer)
134145
end
135146
```
136-
__💡__ The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. For a complete list of available operators see [here](lib/grpc/stream.ex).
147+
The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. See the table below:
148+
149+
| Function | Description | Parameters / Options |
150+
|:---------------------------------|:-------------|:----------------------|
151+
| **`from(input, opts \\\\ [])`** | Converts a gRPC stream (or list) into a `Flow` with backpressure support. Allows joining with external `GenStage` producers. | **Parameters:**<br>• `input` — stream, list, or gRPC struct.<br>**Options:**<br>• `:join_with` — PID or name of an external `GenStage` producer.<br>• `:dispatcher` — dispatcher module (default: `GenStage.DemandDispatcher`).<br>• `:propagate_context` — if `true`, propagates the materializer context.<br>• `:materializer` — the current `%GRPC.Server.Stream{}`.<br>• Other options supported by `Flow`. |
152+
| **`unary(input, opts \\\\ [])`** | Creates a `Flow` from a single gRPC request (unary). Useful for non-streaming calls that still leverage the Flow API. | **Parameters:**<br>• `input` — single gRPC message.<br>**Options:** same as `from/2`. |
153+
| **`to_flow(stream)`** | Returns the underlying `Flow` from a `GRPC.Stream`. If uninitialized, returns `Flow.from_enumerable([])`. | **Parameters:**<br>• `stream``%GRPC.Stream{}` struct. |
154+
| **`run(stream)`** | Executes the `Flow` for a unary stream and returns the first materialized result. | **Parameters:**<br>• `stream``%GRPC.Stream{}` with `unary: true` option. |
155+
| **`run_with(stream, materializer, opts \\\\ [])`** | Executes the `Flow` and sends responses into the gRPC server stream. Supports `:dry_run` for test mode without sending messages. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `materializer``%GRPC.Server.Stream{}`.<br>**Options:**<br>• `:dry_run` — if `true`, responses are not sent. |
156+
| **`ask(stream, target, timeout \\\\ 5000)`** | Sends a request to an external process (`PID` or named process) and waits for a response (`{:response, msg}`). Returns an updated stream or an error. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `target` — PID or atom.<br>• `timeout` — in milliseconds. |
157+
| **`ask!(stream, target, timeout \\\\ 5000)`** | Same as `ask/3`, but raises an exception on failure (aborts the Flow). | Same parameters as `ask/3`. |
158+
| **`filter(stream, fun)`** | Filters items in the stream by applying a concurrent predicate function. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `fun` — function `(item -> boolean)`. |
159+
| **`flat_map(stream, fun)`** | Applies a function returning a list or enumerable, flattening the results. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `fun``(item -> Enumerable.t())`. |
160+
| **`map(stream, fun)`** | Applies a transformation function to each item in the stream. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `fun``(item -> term)`. |
161+
| **`map_with_context(stream, fun)`** | Applies a function to each item, passing the stream context (e.g., headers) as an additional argument. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `fun``(context, item -> term)`. |
162+
| **`partition(stream, opts \\\\ [])`** | Partitions the stream to group items by key or condition before stateful operations like `reduce/3`. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `opts` — partitioning options (`Flow.partition/2`). |
163+
| **`reduce(stream, acc_fun, reducer_fun)`** | Reduces the stream using an accumulator, useful for aggregations. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `acc_fun` — initializer function `() -> acc`.<br>• `reducer_fun``(item, acc -> acc)`. |
164+
| **`uniq(stream)`** | Emits only distinct items from the stream (no custom uniqueness criteria). | **Parameters:**<br>• `stream``%GRPC.Stream{}`. |
165+
| **`uniq_by(stream, fun)`** | Emits only unique items based on the return value of the provided function. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `fun``(item -> term)` for uniqueness determination. |
166+
| **`get_headers(stream)`** | Retrieves HTTP/2 headers from a `%GRPC.Server.Stream{}`. | **Parameters:**<br>• `stream``%GRPC.Server.Stream{}`.<br>**Returns:** `map` containing decoded headers. |
167+
168+
For a complete list of available operators see [here](lib/grpc/stream.ex).
169+
170+
---
137171

138172
## Application Startup
139173

@@ -166,38 +200,125 @@ end
166200

167201
# Client Usage
168202

203+
This section demonstrates how to establish client connections and perform RPC calls using the Elixir gRPC client.
204+
205+
---
206+
207+
## Basic Connection and RPC
208+
209+
210+
Typically, you start this client supervisor as part of your application's supervision tree:
211+
212+
```elixir
213+
children = [
214+
{GRPC.Client.Supervisor, []}
215+
]
216+
217+
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
218+
Supervisor.start_link(children, opts)
219+
```
220+
221+
You can also start it manually in scripts or test environments:
222+
```elixir
223+
{:ok, _pid} = DynamicSupervisor.start_link(strategy: :one_for_one, name: GRPC.Client.Supervisor)
224+
```
225+
226+
Then connect with gRPC server:
227+
169228
```elixir
170229
iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051")
171230
iex> request = Helloworld.HelloRequest.new(name: "grpc-elixir")
172231
iex> {:ok, reply} = channel |> Helloworld.GreetingServer.Stub.say_unary_hello(request)
232+
```
233+
234+
---
235+
236+
## Using Interceptors
237+
238+
Client interceptors allow you to add logic to the request/response lifecycle, such as logging, tracing, or authentication.
239+
240+
```elixir
241+
iex> {:ok, channel} =
242+
...> GRPC.Stub.connect("localhost:50051",
243+
...> interceptors: [GRPC.Client.Interceptors.Logger]
244+
...> )
245+
iex> request = Helloworld.HelloRequest.new(name: "Alice")
246+
iex> {:ok, reply} = channel |> Helloworld.GreetingServer.Stub.say_unary_hello(request)
247+
```
248+
249+
---
250+
251+
## Target Schemes and Resolvers
252+
253+
The `connect/2` function supports URI-like targets that are resolved via the internal **gRPC** [Resolver](lib/grpc/client/resolver.ex).
254+
You can connect using `DNS`, `Unix Domain sockets`, `IPv4/IPv6`, or even `xDS-based endpoints`.
255+
256+
### Supported formats:
173257

174-
# With interceptors
175-
iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051", interceptors: [GRPC.Client.Interceptors.Logger])
176-
...
258+
| Scheme | Example | Description |
259+
|:----------|:----------------------------|:---------------------------------------------|
260+
| `dns://` | `"dns://example.com:50051"` | Resolves via DNS `A/AAAA` records |
261+
| `ipv4:` | `"ipv4:10.0.0.5:50051"` | Connects directly to an IPv4 address |
262+
| `unix:` | `"unix:/tmp/service.sock"` | Connects via a Unix domain socket |
263+
| `xds:///` | `"xds:///my-service"` | Resolves via xDS control plane (Envoy/Istio) |
264+
| none | `"127.0.0.1:50051"` | Implicit DNS (default port `50051`) |
265+
266+
### Example (DNS):
267+
268+
```elixir
269+
iex> {:ok, channel} = GRPC.Stub.connect("dns://orders.prod.svc.cluster.local:50051")
270+
iex> request = Orders.GetOrderRequest.new(id: "123")
271+
iex> {:ok, reply} = channel |> Orders.OrderService.Stub.get_order(request)
177272
```
178273

179-
Check the [examples](examples) and [interop](interop) directories in the project's source code for some examples.
274+
### Example (Unix socket):
275+
276+
```elixir
277+
iex> {:ok, channel} = GRPC.Stub.connect("unix:/tmp/my.sock")
278+
```
279+
280+
>__NOTE__: When using `DNS` or `xDS` targets, the connection layer periodically refreshes endpoints.
281+
---
282+
283+
## Compression and Metadata
284+
285+
You can specify message compression and attach default headers to all requests.
286+
287+
```elixir
288+
iex> {:ok, channel} =
289+
...> GRPC.Stub.connect("localhost:50051",
290+
...> compressor: GRPC.Compressor.Gzip,
291+
...> headers: [{"authorization", "Bearer my-token"}]
292+
...> )
293+
```
294+
295+
---
296+
297+
## Client Adapters
180298

181-
## Client Adapter and Configuration
299+
By default, `GRPC.Stub.connect/2` uses the **Gun** adapter.
300+
You can switch to **Mint** (pure Elixir HTTP/2) or other adapters as needed.
182301

183-
The default adapter used by `GRPC.Stub.connect/2` is `GRPC.Client.Adapter.Gun`. Another option is to use `GRPC.Client.Adapters.Mint` instead, like so:
302+
### Using Mint Adapter
184303

185304
```elixir
186-
GRPC.Stub.connect("localhost:50051",
187-
# Use Mint adapter instead of default Gun
188-
adapter: GRPC.Client.Adapters.Mint
189-
)
305+
iex> GRPC.Stub.connect("localhost:50051",
306+
...> adapter: GRPC.Client.Adapters.Mint
307+
...> )
190308
```
191309

192-
The `GRPC.Client.Adapters.Mint` adapter accepts custom configuration. To do so, you can configure it from your mix application via:
310+
You can configure adapter options globally via your application’s config:
193311

194312
```elixir
195-
# File: your application's config file.
196-
config :grpc, GRPC.Client.Adapters.Mint, custom_opts
313+
# File: config/config.exs
314+
config :grpc, GRPC.Client.Adapters.Mint,
315+
timeout: 10_000,
316+
transport_opts: [cacertfile: "/etc/ssl/certs/ca-certificates.crt"]
197317
```
198318

199-
The accepted options for configuration are the ones listed on [Mint.HTTP.connect/4](https://hexdocs.pm/mint/Mint.HTTP.html#connect/4-options)
319+
The accepted options are the same as [`Mint.HTTP.connect/4`](https://hexdocs.pm/mint/Mint.HTTP.html#connect/4-options).
200320

321+
---
201322

202323
### **HTTP Transcoding**
203324

benchmark/lib/grpc/core/stats.pb.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Grpc.Core.Bucket do
22
@moduledoc false
33

4-
use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3
4+
use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3
55

66
field :start, 1, type: :double
77
field :count, 2, type: :uint64
@@ -10,15 +10,15 @@ end
1010
defmodule Grpc.Core.Histogram do
1111
@moduledoc false
1212

13-
use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3
13+
use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3
1414

1515
field :buckets, 1, repeated: true, type: Grpc.Core.Bucket
1616
end
1717

1818
defmodule Grpc.Core.Metric do
1919
@moduledoc false
2020

21-
use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3
21+
use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3
2222

2323
oneof :value, 0
2424

@@ -30,7 +30,7 @@ end
3030
defmodule Grpc.Core.Stats do
3131
@moduledoc false
3232

33-
use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3
33+
use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3
3434

3535
field :metrics, 1, repeated: true, type: Grpc.Core.Metric
3636
end
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,21 @@
1+
defmodule Grpc.Testing.BenchmarkService.Service do
2+
@moduledoc false
13

4+
use GRPC.Service, name: "grpc.testing.BenchmarkService", protoc_gen_elixir_version: "0.14.0"
5+
6+
rpc :UnaryCall, Grpc.Testing.SimpleRequest, Grpc.Testing.SimpleResponse
7+
8+
rpc :StreamingCall, stream(Grpc.Testing.SimpleRequest), stream(Grpc.Testing.SimpleResponse)
9+
10+
rpc :StreamingFromClient, stream(Grpc.Testing.SimpleRequest), Grpc.Testing.SimpleResponse
11+
12+
rpc :StreamingFromServer, Grpc.Testing.SimpleRequest, stream(Grpc.Testing.SimpleResponse)
13+
14+
rpc :StreamingBothWays, stream(Grpc.Testing.SimpleRequest), stream(Grpc.Testing.SimpleResponse)
15+
end
16+
17+
defmodule Grpc.Testing.BenchmarkService.Stub do
18+
@moduledoc false
19+
20+
use GRPC.Stub, service: Grpc.Testing.BenchmarkService.Service
21+
end

0 commit comments

Comments
 (0)