Skip to content

Commit 4caefe5

Browse files
authored
Merge pull request #143 from rhetzler/ecto-2.1
Add support for Repo.stream/6
2 parents 98ff1ff + eaefd89 commit 4caefe5

2 files changed

Lines changed: 82 additions & 0 deletions

File tree

lib/mongo_ecto.ex

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,21 @@ defmodule Mongo.Ecto do
579579
end
580580
end
581581

582+
# This can be backed by a normal mongo stream, we just have to get it to play nicely with
583+
# ecto's batch/preload functionality ( hence the map(&{nil, [&1]}) )
584+
@doc false
585+
def stream(repo, meta, {:nocache, {function, query}}, params, process, opts) do
586+
stream_or_stub = case apply(NormalizedQuery, function, [query, params]) do
587+
%{__struct__: read} = query when read in @read_queries ->
588+
Connection.read(repo, query, opts)
589+
|> Stream.map(&process_document(&1, query, process))
590+
%WriteQuery{} = write ->
591+
apply(Connection, function, [repo, write, opts])
592+
[ nil ]
593+
end
594+
|> Stream.map(&{nil, [&1]})
595+
end
596+
582597
@doc false
583598
def insert(_repo, meta, _params, _on_conflict, [_|_] = returning, _opts) do
584599
raise ArgumentError,

test/mongo_ecto/stream_test.exs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Alternative, in ecto_test.exs
2+
# Code.require_file "../../deps/ecto/integration_test/sql/stream.exs", __DIR__
3+
# Code.require_file "../deps/ecto/integration_test/sql/stream.exs", __DIR__
4+
# However this assumes everything can be run in a transaction, so it fails
5+
# a precondition of runnability.
6+
# So taking as much of the content as is viable and re-wiring it for mongo
7+
8+
defmodule Mongo.Ecto.Integration.StreamTest do
9+
use Ecto.Integration.Case
10+
11+
alias Ecto.Integration.TestRepo
12+
alias Ecto.Integration.Post
13+
alias Ecto.Integration.Comment
14+
import Ecto.Query
15+
16+
test "stream empty" do
17+
assert [] == TestRepo.stream(Post)
18+
|> Enum.to_list()
19+
20+
assert [] == TestRepo.stream(from p in Post)
21+
|> Enum.to_list()
22+
end
23+
24+
test "stream without schema" do
25+
%Post{} = TestRepo.insert!(%Post{title: "title1"})
26+
%Post{} = TestRepo.insert!(%Post{title: "title2"})
27+
28+
assert ["title1", "title2"] == TestRepo.stream(from(p in "posts", order_by: p.title, select: p.title))
29+
|> Enum.to_list()
30+
end
31+
32+
test "stream with assoc" do
33+
p1 = TestRepo.insert!(%Post{title: "1"})
34+
35+
%Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id})
36+
%Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id})
37+
38+
stream = TestRepo.stream(Ecto.assoc(p1, :comments))
39+
assert [c1, c2] = Enum.to_list(stream)
40+
assert c1.id == cid1
41+
assert c2.id == cid2
42+
end
43+
44+
test "stream with preload" do
45+
p1 = TestRepo.insert!(%Post{title: "1"})
46+
p2 = TestRepo.insert!(%Post{title: "2"})
47+
TestRepo.insert!(%Post{title: "3"})
48+
49+
%Comment{id: cid1} = TestRepo.insert!(%Comment{text: "1", post_id: p1.id})
50+
%Comment{id: cid2} = TestRepo.insert!(%Comment{text: "2", post_id: p1.id})
51+
%Comment{id: cid3} = TestRepo.insert!(%Comment{text: "3", post_id: p2.id})
52+
%Comment{id: cid4} = TestRepo.insert!(%Comment{text: "4", post_id: p2.id})
53+
54+
assert [p1, p2, p3] = from(p in Post, preload: [:comments], select: p)
55+
|> TestRepo.stream([max_rows: 2])
56+
|> sort_by_id()
57+
assert [%Comment{id: ^cid1}, %Comment{id: ^cid2}] = p1.comments |> sort_by_id
58+
assert [%Comment{id: ^cid3}, %Comment{id: ^cid4}] = p2.comments |> sort_by_id
59+
assert [] == p3.comments
60+
end
61+
62+
defp sort_by_id(values) do
63+
Enum.sort_by(values, &(&1.id))
64+
end
65+
66+
67+
end

0 commit comments

Comments
 (0)