|
2 | 2 | # Date: 2025/9/9 |
3 | 3 |
|
4 | 4 | import json |
| 5 | +from base64 import b64encode |
5 | 6 |
|
6 | 7 | import requests |
7 | 8 | from elasticsearch import Elasticsearch |
8 | | -from fastapi import HTTPException |
9 | 9 |
|
10 | 10 | from apps.datasource.models.datasource import DatasourceConf |
11 | 11 |
|
@@ -60,29 +60,55 @@ def get_es_fields(conf: DatasourceConf, table_name: str): |
60 | 60 | return res |
61 | 61 |
|
62 | 62 |
|
63 | | -def get_es_data(conf: DatasourceConf, sql: str, table_name: str): |
64 | | - r = requests.post(f"{conf.host}/_sql/translate", json={"query": sql}) |
65 | | - if r.json().get('error'): |
66 | | - print(json.dumps(r.json())) |
| 63 | +# def get_es_data(conf: DatasourceConf, sql: str, table_name: str): |
| 64 | +# r = requests.post(f"{conf.host}/_sql/translate", json={"query": sql}) |
| 65 | +# if r.json().get('error'): |
| 66 | +# print(json.dumps(r.json())) |
| 67 | +# |
| 68 | +# es_client = get_es_connect(conf) |
| 69 | +# response = es_client.search( |
| 70 | +# index=table_name, |
| 71 | +# body=json.dumps(r.json()) |
| 72 | +# ) |
| 73 | +# |
| 74 | +# # print(response) |
| 75 | +# fields = get_es_fields(conf, table_name) |
| 76 | +# res = [] |
| 77 | +# for hit in response.get('hits').get('hits'): |
| 78 | +# item = [] |
| 79 | +# if 'fields' in hit: |
| 80 | +# result = hit.get('fields') # {'title': ['Python'], 'age': [30]} |
| 81 | +# for field in fields: |
| 82 | +# v = result.get(field[0]) |
| 83 | +# item.append(v[0]) if v else item.append(None) |
| 84 | +# res.append(tuple(item)) |
| 85 | +# # print(hit['fields']['title'][0]) |
| 86 | +# # elif '_source' in hit: |
| 87 | +# # print(hit.get('_source')) |
| 88 | +# return res, fields |
67 | 89 |
|
68 | | - es_client = get_es_connect(conf) |
69 | | - response = es_client.search( |
70 | | - index=table_name, |
71 | | - body=json.dumps(r.json()) |
72 | | - ) |
73 | 90 |
|
74 | | - # print(response) |
75 | | - fields = get_es_fields(conf, table_name) |
76 | | - res = [] |
77 | | - for hit in response.get('hits').get('hits'): |
78 | | - item = [] |
79 | | - if 'fields' in hit: |
80 | | - result = hit.get('fields') # {'title': ['Python'], 'age': [30]} |
81 | | - for field in fields: |
82 | | - v = result.get(field[0]) |
83 | | - item.append(v[0]) if v else item.append(None) |
84 | | - res.append(tuple(item)) |
85 | | - # print(hit['fields']['title'][0]) |
86 | | - # elif '_source' in hit: |
87 | | - # print(hit.get('_source')) |
88 | | - return res, fields |
| 91 | +def get_es_data_by_http(conf: DatasourceConf, sql: str): |
| 92 | + url = conf.host |
| 93 | + while url.endswith('/'): |
| 94 | + url = sql[:-1] |
| 95 | + |
| 96 | + host = f'{url}/_sql?format=json' |
| 97 | + username = f"{conf.username}" |
| 98 | + password = f"{conf.password}" |
| 99 | + |
| 100 | + credentials = f"{username}:{password}" |
| 101 | + encoded_credentials = b64encode(credentials.encode()).decode() |
| 102 | + |
| 103 | + headers = { |
| 104 | + "Content-Type": "application/json", |
| 105 | + "Authorization": f"Basic {encoded_credentials}" |
| 106 | + } |
| 107 | + |
| 108 | + response = requests.post(host, data=json.dumps({"query": sql}), headers=headers) |
| 109 | + |
| 110 | + # print(response.json()) |
| 111 | + res = response.json() |
| 112 | + fields = res.get('columns') |
| 113 | + result = res.get('rows') |
| 114 | + return result, fields |
0 commit comments