Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/firebase_functions/db_fn.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

# pylint: disable=protected-access
import dataclasses as _dataclass
import datetime as _dt
import functools as _functools
import typing as _typing

Expand Down Expand Up @@ -126,10 +125,7 @@ def _db_endpoint_handler(
id=event_attributes["id"],
source=event_attributes["source"],
type=event_attributes["type"],
time=_dt.datetime.strptime(
event_attributes["time"],
"%Y-%m-%dT%H:%M:%S.%f%z",
),
time=_util.timestamp_conversion(event_attributes["time"]),
data=database_event_data,
subject=event_attributes["subject"],
params=params,
Expand Down
13 changes: 7 additions & 6 deletions src/firebase_functions/private/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,15 @@ def __str__(self) -> str:

def get_precision_timestamp(time: str) -> PrecisionTimestamp:
"""Return a bool which indicates if the timestamp is in nanoseconds"""
# Split the string into date-time and fraction of second
try:
_, s_fraction = time.split(".")
except ValueError:
if "." not in time:
Comment thread
CorieW marked this conversation as resolved.
return PrecisionTimestamp.SECONDS

# Split the fraction from the timezone specifier ('Z' or 'z')
s_fraction, _ = s_fraction.split("Z") if "Z" in s_fraction else s_fraction.split("z")
_, s_fraction = time.split(".", 1)
fraction_match = _re.match(r"\d+", s_fraction)
if fraction_match is None:
raise ValueError("Invalid timestamp")

s_fraction = fraction_match.group()
Comment thread
CorieW marked this conversation as resolved.
Outdated

# If the fraction is more than 6 digits long, it's a nanosecond timestamp
if len(s_fraction) > 6:
Expand Down
35 changes: 35 additions & 0 deletions tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Tests for the db module.
"""

import datetime as dt
import unittest
from unittest import mock

Expand Down Expand Up @@ -81,3 +82,37 @@ def test_missing_auth_context(self):
self.assertIsNotNone(event_arg)
self.assertEqual(event_arg.auth_type, "unknown")
self.assertIsNone(event_arg.auth_id)

def test_written_event_parses_timestamp_without_microseconds(self):
func = mock.Mock(__name__="example_func_no_microseconds")
decorated_func = db_fn.on_value_written(reference="/items/{itemId}")(func)

event = CloudEvent(
attributes={
"specversion": "1.0",
"id": "issue-257-repro",
"source": "//firebase.test/projects/demo-test/instances/my-instance/refs/items/123",
"subject": "refs/items/123",
"type": "google.firebase.database.ref.v1.written",
"time": "2025-10-30T21:15:51Z",
"instance": "my-instance",
"ref": "/items/123",
"firebasedatabasehost": "my-instance.firebaseio.com",
"location": "location",
},
data={
"data": {"existing": True},
"delta": {"updated": True},
},
)

decorated_func(event)

func.assert_called_once()
event_arg = func.call_args.args[0]
self.assertEqual(
event_arg.time,
dt.datetime(2025, 10, 30, 21, 15, 51, tzinfo=dt.timezone.utc),
)
self.assertEqual(event_arg.data.after, {"existing": True, "updated": True})
self.assertEqual(event_arg.params, {"itemId": "123"})
54 changes: 54 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
nanoseconds_timestamp_conversion,
normalize_path,
second_timestamp_conversion,
timestamp_conversion,
)

test_bucket = "python-functions-testing.appspot.com"
Expand Down Expand Up @@ -107,6 +108,59 @@ def test_second_conversion():
assert second_timestamp_conversion(input_timestamp) == expected_datetime


def test_timestamp_conversion_supported_formats():
"""
Testing shared timestamp conversion handles supported RTDB and CloudEvent formats.
"""
timestamps = [
(
"2024-04-10T12:00:00.000Z",
_dt.datetime(2024, 4, 10, 12, 0, tzinfo=_dt.timezone.utc),
),
(
"2024-04-10T12:00:00.123456Z",
_dt.datetime(2024, 4, 10, 12, 0, 0, 123456, tzinfo=_dt.timezone.utc),
),
(
"2024-04-10T12:00:00.123456+05:30",
_dt.datetime(
2024,
4,
10,
12,
0,
0,
123456,
tzinfo=_dt.timezone(_dt.timedelta(hours=5, minutes=30)),
),
),
(
"2024-04-10T12:00:00.123456-0700",
_dt.datetime(
2024,
4,
10,
12,
0,
0,
123456,
tzinfo=_dt.timezone(-_dt.timedelta(hours=7)),
),
),
(
"2023-01-01T12:34:56.123456789Z",
_dt.datetime(2023, 1, 1, 12, 34, 56, 123456, tzinfo=_dt.timezone.utc),
),
(
"2025-10-30T21:15:51Z",
_dt.datetime(2025, 10, 30, 21, 15, 51, tzinfo=_dt.timezone.utc),
),
]

for input_timestamp, expected_datetime in timestamps:
assert timestamp_conversion(input_timestamp) == expected_datetime


def test_is_nanoseconds_timestamp():
"""
Testing is_nanoseconds_timestamp works as intended
Expand Down
Loading