-
Notifications
You must be signed in to change notification settings - Fork 418
Expand file tree
/
Copy pathblob_factory.py
More file actions
45 lines (34 loc) · 1.35 KB
/
blob_factory.py
File metadata and controls
45 lines (34 loc) · 1.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import asyncio
from typing import Optional
from common.config.config import Config # Load config
from common.logger.app_logger import AppLogger
from common.storage.blob_azure import AzureBlobStorage
from common.storage.blob_base import BlobStorageBase
class BlobStorageFactory:
_instance: Optional[BlobStorageBase] = None
_logger = AppLogger("BlobStorageFactory")
@staticmethod
async def get_storage() -> BlobStorageBase:
if BlobStorageFactory._instance is None:
config = Config()
BlobStorageFactory._instance = AzureBlobStorage(
account_name=config.azure_blob_account_name,
container_name=config.azure_blob_container_name,
)
BlobStorageFactory._logger.info(
f"Initialized Azure Blob Storage: {config.azure_blob_container_name}"
)
return BlobStorageFactory._instance
@staticmethod
async def close_storage() -> None:
if BlobStorageFactory._instance:
BlobStorageFactory._instance = None
# Local testing of config and code
async def main():
storage = await BlobStorageFactory.get_storage()
# Use the storage instance
blob = await storage.get_file("q1_informix.sql")
print("Blob content:", blob)
await BlobStorageFactory.close_storage()
if __name__ == "__main__":
asyncio.run(main())