Skip to content

Commit d829e47

Browse files
committed
init 2.0
1 parent 97b04a5 commit d829e47

44 files changed

Lines changed: 832 additions & 157 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/main/java/de/mediathekview/mserver/base/utils/CheckUrlAvailability.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,23 @@ public CheckUrlAvailability(final long minFileSize, final long timeoutInSec, fin
3232
fsd = new FileSizeDeterminer(30L, 30L, numberOfThreads);
3333
}
3434

35-
public Filmlist getAvaiableFilmlist(final Filmlist importList) {
36-
LOG.debug("start getAvaiableFilmlist(minSize {} byte, timeout {} sec)", this.minFileSize, (this.timeoutInMS/1000));
35+
public Filmlist getAvailableFilmlist(final Filmlist importList) {
36+
return getAvailableFilmlist(importList, true);
37+
}
38+
public Filmlist getAvailableFilmlist(final Filmlist importList, final boolean available) {
39+
LOG.debug("start getAvailableFilmlist(minSize {} byte, timeout {} sec)", this.minFileSize, (this.timeoutInMS/1000));
3740
start = System.currentTimeMillis();
3841
Filmlist filteredFilmlist = new Filmlist();
3942
filteredFilmlist.setCreationDate(importList.getCreationDate());
4043
filteredFilmlist.setListId(importList.getListId());
4144
//
4245
ForkJoinPool customThreadPool = new ForkJoinPool(numberOfThreads);
4346
customThreadPool.submit(() -> importList.getFilms().values().parallelStream()
44-
.filter(this::isAvailable)
47+
.filter(film -> isAvailable(film) == available)
4548
.forEach(filteredFilmlist::add))
4649
.join();
4750
customThreadPool.shutdown();
51+
customThreadPool.close();
4852
//
4953
LOG.debug("checked {} urls and removed {} in {} sec and timeout was reached: {}", importList.getFilms().size(), removedCounter.get(), ((System.currentTimeMillis()-start)/1000), timeout.get());
5054
return filteredFilmlist;
@@ -55,8 +59,10 @@ private boolean isAvailable(Film pFilm) {
5559
timeout.set(true);
5660
return true;
5761
}
58-
59-
String normalUrl = pFilm.getUrl(Resolution.NORMAL).getUrl().toString();
62+
if(pFilm.getDefaultUrl().isEmpty()) {
63+
System.out.println("asdf");
64+
}
65+
String normalUrl = pFilm.getDefaultUrl().get().getUrl().toString();
6066
ResponseInfo ri = fsd.getRequestInfo(normalUrl);
6167

6268
if (pFilm.getThema().equalsIgnoreCase("Livestream")) {
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
package de.mediathekview.mserver.base.utils;
2+
3+
import com.google.gson.Gson;
4+
import com.google.gson.GsonBuilder;
5+
6+
import de.mediathekview.mserver.daten.Film;
7+
import de.mediathekview.mserver.daten.Filmlist;
8+
import de.mediathekview.mserver.daten.GsonDurationAdapter;
9+
import de.mediathekview.mserver.daten.GsonLocalDateTimeAdapter;
10+
11+
import javax.sql.DataSource;
12+
13+
import org.apache.logging.log4j.LogManager;
14+
import org.apache.logging.log4j.Logger;
15+
16+
import java.io.IOException;
17+
import java.sql.Connection;
18+
import java.sql.PreparedStatement;
19+
import java.sql.ResultSet;
20+
import java.sql.SQLException;
21+
import java.time.Duration;
22+
import java.time.LocalDateTime;
23+
import java.util.ArrayList;
24+
import java.util.Collection;
25+
import java.util.Comparator;
26+
import java.util.HashMap;
27+
import java.util.HashSet;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Optional;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Future;
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
import java.util.function.Function;
35+
import java.util.stream.Collectors;
36+
37+
/**
38+
* Service zum Speichern einzelner Filme aus einer Filmlist in die DB. Nutzt
39+
* Batch-UPSERT und einen vorhandenen ExecutorService für Parallelität.
40+
*/
41+
public class FilmDBService {
42+
private static final Logger LOG = LogManager.getLogger(FilmDBService.class);
43+
private final DataSource dataSource;
44+
private final Gson gson;
45+
private final ExecutorService executorService;
46+
private final int batchSize;
47+
48+
public FilmDBService(DataSource dataSource, ExecutorService executorService, int batchSize) {
49+
this.dataSource = dataSource;
50+
this.executorService = executorService;
51+
this.batchSize = batchSize;
52+
53+
this.gson = new GsonBuilder().registerTypeAdapter(LocalDateTime.class, new GsonLocalDateTimeAdapter())
54+
.registerTypeAdapter(Duration.class, new GsonDurationAdapter()).create();
55+
}
56+
57+
58+
public void update(String sql) {
59+
try (Connection con = dataSource.getConnection(); PreparedStatement ps = con.prepareStatement(sql)) {
60+
LOG.debug("updated {} rows", ps.executeUpdate());
61+
} catch (Exception e) {
62+
LOG.error(e);
63+
}
64+
}
65+
66+
/////////////////////////////////////////////////////////////////////////////////////////
67+
/////////////////////////////////////////////////////////////////////////////////////////
68+
69+
public void deleteFilms(Collection<Film> abandonedFilmlist) {
70+
try {
71+
List<Future<List<Film>>> futures = new ArrayList<>();
72+
List<Film> allVideos = abandonedFilmlist.stream()
73+
.sorted(Comparator.comparing(Film::getId))
74+
.toList();
75+
for (int i = 0; i < allVideos.size(); i += batchSize) {
76+
int from = i;
77+
int to = Math.min(i + batchSize, allVideos.size());
78+
List<Film> batch = allVideos.subList(from, to);
79+
futures.add(executorService.submit(() -> {
80+
List<Film> newVideos = new ArrayList<>();
81+
String sql = "DELETE FROM filme WHERE id = ?";
82+
try (Connection con = dataSource.getConnection(); PreparedStatement ps = con.prepareStatement(sql)) {
83+
for (Film video : batch) {
84+
ps.setString(1, video.getId());
85+
ps.addBatch();
86+
}
87+
ps.executeBatch();
88+
} catch (SQLException e) {
89+
LOG.error(e);
90+
}
91+
return newVideos;
92+
}));
93+
}
94+
List<Film> result = new ArrayList<>();
95+
for (Future<List<Film>> f : futures) {
96+
result.addAll(f.get());
97+
}
98+
LOG.debug("deleted {}", abandonedFilmlist.size());
99+
100+
} catch (Exception e) {
101+
LOG.error(e);
102+
}
103+
}
104+
105+
/////////////////////////////////////////////////////////////////////////////////////////
106+
/////////////////////////////////////////////////////////////////////////////////////////
107+
108+
public Optional<Filmlist> readFilmlistFromDB() {
109+
return readFilmlistFromDB("");
110+
}
111+
112+
public Optional<Filmlist> readFilmlistFromDB(String where) {
113+
LOG.debug("fetch data from DB");
114+
int readCounter = 0;
115+
Filmlist list = new Filmlist();
116+
try (Connection con = dataSource.getConnection();
117+
PreparedStatement ps = con.prepareStatement("SELECT data FROM filme " + where + " ORDER BY data ->> 'sender', data ->> 'thema', data ->> 'titel'");
118+
) {
119+
ps.setFetchSize(50000);
120+
try (ResultSet rs = ps.executeQuery()) {
121+
while (rs.next()) {
122+
String json = rs.getString("data");
123+
list.add(gson.fromJson(json, Film.class));
124+
readCounter++;
125+
}
126+
}
127+
LOG.debug("Filmlist read {} records and imported {} records", readCounter, list.getFilms().size());
128+
return Optional.of(list);
129+
} catch (Exception e) {
130+
LOG.error(e);
131+
}
132+
return Optional.empty();
133+
}
134+
135+
/////////////////////////////////////////////////////////////////////////////////////////
136+
/////////////////////////////////////////////////////////////////////////////////////////
137+
138+
public <T> List<T> filterNewVideos(List<T> videos, Function<T, String> idExtractor) {
139+
try {
140+
List<Future<List<T>>> futures = new ArrayList<>();
141+
142+
List<T> allVideos = videos.stream()
143+
.sorted(Comparator.comparing(idExtractor))
144+
.toList();
145+
146+
for (int i = 0; i < allVideos.size(); i += batchSize) {
147+
int from = i;
148+
int to = Math.min(i + batchSize, allVideos.size());
149+
List<T> batch = allVideos.subList(from, to);
150+
151+
futures.add(executorService.submit(() -> {
152+
List<T> newVideos = new ArrayList<>();
153+
154+
String sql = "UPDATE filme SET last_update = now() WHERE id = ?";
155+
156+
try (Connection con = dataSource.getConnection(); PreparedStatement ps = con.prepareStatement(sql)) {
157+
158+
for (T video : batch) {
159+
String id = idExtractor.apply(video);
160+
if (id != null) {
161+
ps.setString(1, id);
162+
ps.addBatch();
163+
} else {
164+
LOG.error("filterNewVideos - Missing ID for Film {}", video);
165+
}
166+
}
167+
int[] rs = ps.executeBatch();
168+
for (int rsIndex = 0; rsIndex < rs.length; rsIndex++) {
169+
if (rs[rsIndex] == 0) {
170+
newVideos.add(batch.get(rsIndex));
171+
}
172+
}
173+
174+
} catch (SQLException e) {
175+
LOG.error(e);
176+
}
177+
return newVideos;
178+
}));
179+
}
180+
List<T> result = new ArrayList<>();
181+
for (Future<List<T>> f : futures) {
182+
result.addAll(f.get());
183+
}
184+
LOG.debug("Filtered {} (in {} out {})",(videos.size()-result.size()), videos.size(), result.size());
185+
return result;
186+
} catch (Exception e) {
187+
return videos;
188+
}
189+
}
190+
191+
192+
193+
////////////////////////////////////////////////////////////////////////////////////////////////////
194+
////////////////////////////////////////////////////////////////////////////////////////////////////
195+
196+
public HashSet<String> getAllVideoUrls() {
197+
HashSet<String> allVideoUrls = new HashSet<String>();
198+
String sql = """
199+
SELECT
200+
data -> 'urls' -> 'SMALL' ->> 'url' aSmall,
201+
data -> 'urls' -> 'NORMAL' ->> 'url' aNormal,
202+
data -> 'urls' -> 'HD' ->> 'url' aHD
203+
FROM filme
204+
""";
205+
try (Connection con = dataSource.getConnection(); PreparedStatement ps = con.prepareStatement(sql)) {
206+
try (ResultSet rs = ps.executeQuery()) {
207+
while (rs.next()) {
208+
allVideoUrls.add(rs.getString(1));
209+
allVideoUrls.add(rs.getString(2));
210+
allVideoUrls.add(rs.getString(3));
211+
}
212+
}
213+
} catch (SQLException e) {
214+
LOG.error("getAllVideoUrls failed", e);
215+
}
216+
return allVideoUrls;
217+
}
218+
219+
////////////////////////////////////////////////////////////////////////////////////////////////////
220+
////////////////////////////////////////////////////////////////////////////////////////////////////
221+
222+
/**
223+
* Speichert alle Filme einer Filmlist parallel in der DB.
224+
*/
225+
public void saveAll(Filmlist filmlist) throws Exception {
226+
// Map in List konvertieren
227+
List<Film> films = new ArrayList<>(filmlist.getFilms().values());
228+
films = makeUniqueIds(films);
229+
AtomicInteger successCounter = new AtomicInteger(0);
230+
List<Future<?>> futures = new ArrayList<>();
231+
232+
for (int i = 0; i < films.size(); i += batchSize) {
233+
int from = i;
234+
int to = Math.min(i + batchSize, films.size());
235+
List<Film> batch = films.subList(from, to);
236+
237+
futures.add(executorService.submit(() -> {
238+
try {
239+
successCounter.addAndGet(saveBatch(batch));
240+
} catch (SQLException | IOException e) {
241+
LOG.error(e);
242+
}
243+
}));
244+
}
245+
246+
for (Future<?> f : futures) {
247+
f.get();
248+
}
249+
250+
LOG.info("Stored {} films in DB", successCounter.get());
251+
}
252+
253+
/**
254+
* Speichert einen Batch von Filmen als Upsert in der DB.
255+
*/
256+
private int saveBatch(List<Film> films) throws SQLException, IOException {
257+
int successCounter = 0;
258+
259+
String sql = """
260+
INSERT INTO filme (id, data, created_at, last_update)
261+
VALUES (?, ?::jsonb, now(), now())
262+
ON CONFLICT (id) DO UPDATE
263+
SET data = EXCLUDED.data,
264+
last_update = now(),
265+
created_at = filme.created_at
266+
""";
267+
268+
try (Connection con = dataSource.getConnection(); PreparedStatement ps = con.prepareStatement(sql)) {
269+
270+
for (Film film : films) {
271+
if(film.getId() != null) {
272+
ps.setString(1, film.getId());
273+
ps.setString(2, gson.toJson(film));
274+
ps.addBatch();
275+
successCounter++;
276+
} else {
277+
LOG.error("saveBatch - Missing ID for film {}", film);
278+
}
279+
}
280+
281+
ps.executeBatch();
282+
}
283+
return successCounter;
284+
}
285+
286+
private static List<Film> makeUniqueIds(List<Film> films) {
287+
Map<String, AtomicInteger> idCount = new HashMap<>();
288+
289+
return films.stream().map(film -> {
290+
String originalId = film.getId();
291+
AtomicInteger count = idCount.computeIfAbsent(originalId, k -> new AtomicInteger(0));
292+
293+
int c = count.getAndIncrement();
294+
if (c == 0) {
295+
return film; // erste ID bleibt unverändert
296+
} else {
297+
// Duplikat → neue ID mit Suffix #1, #2 ...
298+
film.setId(originalId + "#" + c);
299+
return film;
300+
}
301+
}).collect(Collectors.toList());
302+
}
303+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package de.mediathekview.mserver.base.utils;
2+
3+
import com.zaxxer.hikari.HikariConfig;
4+
import com.zaxxer.hikari.HikariDataSource;
5+
6+
import javax.sql.DataSource;
7+
8+
public final class GPDataSourceProvider {
9+
10+
private static final HikariDataSource DATA_SOURCE;
11+
12+
static {
13+
HikariConfig cfg = new HikariConfig();
14+
15+
// === JDBC ===
16+
cfg.setJdbcUrl("jdbc:postgresql://OscarDS:55432/crawler");
17+
cfg.setUsername("crawler");
18+
cfg.setPassword("secret");
19+
20+
// === Pool Sizing (wichtig!) ===
21+
cfg.setMaximumPoolSize(16); // Sweet Spot für 10k+/min
22+
cfg.setMinimumIdle(4);
23+
24+
// === Performance ===
25+
cfg.setAutoCommit(true);
26+
cfg.setConnectionTimeout(3000);
27+
cfg.setIdleTimeout(600_000);
28+
cfg.setMaxLifetime(1_800_000);
29+
30+
// === PostgreSQL Optimierungen ===
31+
cfg.addDataSourceProperty("reWriteBatchedInserts", "true");
32+
cfg.addDataSourceProperty("stringtype", "unspecified");
33+
34+
// === Debug (optional) ===
35+
cfg.setPoolName("CrawlerPool");
36+
37+
DATA_SOURCE = new HikariDataSource(cfg);
38+
}
39+
40+
private GPDataSourceProvider() {
41+
// no instances
42+
}
43+
44+
public static DataSource get() {
45+
return DATA_SOURCE;
46+
}
47+
48+
public static void shutdown() {
49+
DATA_SOURCE.close();
50+
}
51+
}

0 commit comments

Comments
 (0)