Beracles commited on
Commit
1c7b6d3
·
1 Parent(s): 88be5d0

优化日志管理,支持最近30天的日志加载和日期范围验证功能

Browse files
Files changed (3) hide show
  1. logging_helper.py +155 -159
  2. main.py +4 -4
  3. utils.py +40 -0
logging_helper.py CHANGED
@@ -5,10 +5,11 @@ a module of logs saving and backuping
5
  import os
6
  import datasets as ds
7
  from apscheduler.schedulers.background import BackgroundScheduler
8
- from tqdm import tqdm
9
  from utils import beijing, md5, json_to_str
10
  from huggingface_hub import HfApi
11
  import pandas as pd
 
 
12
  import glob
13
 
14
  hf = HfApi()
@@ -22,6 +23,7 @@ class LoggingHelper:
22
  repo_id: str,
23
  local_dir: str = "data/logs",
24
  synchronize_interval: int = 60,
 
25
  ):
26
  """
27
  :param repo_id: the repo_id of the dataset in huggingface
@@ -29,6 +31,7 @@ class LoggingHelper:
29
  :param synchronize_interval: the interval of synchronizing between local and huggingface
30
 
31
  """
 
32
  self.local_dir = local_dir
33
  self.repo_id = repo_id
34
  self.synchronize_interval = synchronize_interval
@@ -39,11 +42,11 @@ class LoggingHelper:
39
  self.today = beijing().date()
40
  ds.disable_progress_bar()
41
  self.dataframe: pd.DataFrame
42
- # 缓存相关变量
43
- self.cached_df: pd.DataFrame | None = None
44
- self.loaded_files: set[str] = set()
45
- self.cache_needs_refresh = False
46
  self.pull()
 
 
47
  self.start_synchronize()
48
 
49
  def addlog(self, log: dict):
@@ -57,6 +60,7 @@ class LoggingHelper:
57
  self.buffer[remotepath] = ds.Dataset.from_dict({})
58
  self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
59
  self.need_push[remotepath] = True
 
60
  print("[addlog] Added a log to buffer")
61
 
62
  def remotedir(self):
@@ -66,36 +70,6 @@ class LoggingHelper:
66
  day = now.day.__str__()
67
  return "/".join([year, month, day])
68
 
69
- def pull(self):
70
- try:
71
- self.download()
72
- remotedir = self.remotedir()
73
- print(f"[pull] today dir: {remotedir}")
74
- filenames = hf.list_repo_files(
75
- repo_id=self.repo_id,
76
- repo_type=self.repo_type,
77
- )
78
- files_to_load = [
79
- filename
80
- for filename in filenames
81
- if filename not in self.buffer
82
- and filename.startswith(remotedir)
83
- and filename.endswith(".json")
84
- ]
85
- print(f"[pull] total {len(files_to_load)} to load")
86
- for filename in tqdm(files_to_load):
87
- print()
88
- path = os.sep.join([self.local_dir, filename])
89
- with open(path, "r") as f:
90
- data = f.read()
91
- if len(data) != 0:
92
- self.buffer[filename] = ds.Dataset.from_json(path) # type: ignore
93
- self.need_push[filename] = False
94
- return True
95
- except Exception as e:
96
- print(f"[pull] {type(e)}: {e}")
97
- return False
98
-
99
  def push_yesterday(self) -> bool:
100
  try:
101
  year = self.today.year.__str__()
@@ -106,9 +80,6 @@ class LoggingHelper:
106
  for filename in self.buffer.keys():
107
  if not filename.startswith(remotedir):
108
  continue
109
- if not self.need_push[filename]:
110
- del self.buffer[filename]
111
- del self.need_push[filename]
112
  files_to_push.append(filename)
113
  if len(files_to_push) == 0:
114
  return True
@@ -168,25 +139,116 @@ class LoggingHelper:
168
  self.need_push[filename] = False
169
  print(f"[push] Log files pushed to {res}")
170
  print("[push] Done")
171
- # 标记缓存需要刷新
172
- self.cache_needs_refresh = True
173
  return True
174
  except Exception as e:
175
  print(f"[push] {type(e)}: {e}")
176
  return False
177
 
178
- def download(self):
179
- print("[download] Starting downloading")
180
  try:
181
  res = hf.snapshot_download(
182
  repo_id=self.repo_id,
183
  repo_type="dataset",
184
  local_dir=self.local_dir,
185
  )
186
- print(f"[download] Downloaded to {res}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
187
  except Exception as e:
188
- print(f"[download] {type(e)}: {e}")
189
- print("[download] Done")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
190
 
191
  def start_synchronize(self):
192
  self.scheduler.add_job(
@@ -194,130 +256,64 @@ class LoggingHelper:
194
  "interval",
195
  seconds=self.synchronize_interval,
196
  )
 
 
 
 
 
 
 
197
  self.scheduler.start()
198
 
199
- def _load_all_logs(self, from_date=None, to_date=None) -> pd.DataFrame:
200
  """
201
- 加载日志文件并返回合并后的DataFrame
202
-
203
- 使用直接路径构造方式高效地检索特定日期范围内的文件
204
-
205
- :param from_date: 开始日期(格式:YYYY-MM-DD或datetime.date),默认为None
206
- :param to_date: 结束日期(格式:YYYY-MM-DD或datetime.date),默认为None
207
  """
208
- import datetime
209
-
210
- print("[_load_all_logs] Starting to load logs")
211
- print(f"[_load_all_logs] Date range: {from_date} to {to_date}")
212
-
213
- filepathes = []
214
-
215
- # 确定日期范围
216
- if from_date is None and to_date is None:
217
- # 如果没有指定范围,扫描所有目录
218
- files = glob.glob("**/*.json", root_dir=self.local_dir, recursive=True)
219
- filepathes = [os.path.join(self.local_dir, file) for file in files]
220
- else:
221
- # 将日期参数转换为 datetime.date 对象
222
- start_date = from_date
223
- end_date = to_date
224
-
225
- if isinstance(start_date, str):
226
- start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date()
227
- if isinstance(end_date, str):
228
- end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d").date()
229
-
230
- # 如果只指定了一个日期,设置默认值
231
- if start_date is None:
232
- start_date = end_date
233
- if end_date is None:
234
- end_date = start_date
235
-
236
- # 确保日期不为 None 的类型检查
237
- if start_date is not None and end_date is not None:
238
- # 直接构造日期范围内的目录路径,避免 glob 遍历
239
- current_date = start_date
240
- date_dirs = []
241
- while current_date <= end_date:
242
- year = str(current_date.year)
243
- month = str(current_date.month)
244
- day = str(current_date.day)
245
- date_dir = os.path.join(self.local_dir, year, month, day)
246
- date_dirs.append((date_dir, year, month, day))
247
- current_date += datetime.timedelta(days=1)
248
-
249
- print(
250
- f"[_load_all_logs] Constructed {len(date_dirs)} date directories"
251
- )
252
-
253
- # 从指定日期目录中查找 JSON 文件
254
- for date_dir, year, month, day in date_dirs:
255
- if os.path.isdir(date_dir):
256
- json_files = glob.glob("*.json", root_dir=date_dir)
257
- for json_file in json_files:
258
- filepathes.append(os.path.join(date_dir, json_file))
259
-
260
- print(f"[_load_all_logs] Found {len(filepathes)} files in date range")
261
-
262
- # 加载所有日志文件
263
- datasets = []
264
- for path in tqdm(filepathes):
265
- path = str(path)
266
- try:
267
- datasets.append(ds.Dataset.from_json(path))
268
- except Exception as e:
269
- print(f"[_load_all_logs] Error loading {path}: {e}")
270
- continue
271
-
272
- # 合并数据集并排序
273
- df = pd.DataFrame()
274
- if datasets:
275
- dataset: ds.Dataset = ds.concatenate_datasets(datasets)
276
- df = dataset.to_pandas()
277
- assert isinstance(df, pd.DataFrame)
278
- df = df.sort_values(by="timestamp", ascending=False)
279
-
280
- print(f"[_load_all_logs] Loaded {len(df)} logs")
281
- self.loaded_files = set([os.path.relpath(p, self.local_dir) for p in filepathes])
282
- return df
283
 
284
  def refresh(self, from_date=None, to_date=None) -> list[dict]:
285
  """
286
- 获取刷新后的日志列表,支持日期范围过滤
287
 
288
- :param from_date: 开始日期(格式:YYYY-MM-DD或datetime.date),默认为None
289
- :param to_date: 结束日期(格式:YYYY-MM-DD或datetime.date),默认为None
290
- :return: 日志字典列表
291
- """
292
- import datetime
293
 
294
- self.push()
 
 
 
 
 
295
 
 
296
  # 将字符串日期转换为 datetime.date 对象
 
297
  if isinstance(from_date, str):
298
- from_date = datetime.datetime.strptime(from_date, "%Y-%m-%d").date()
 
 
 
 
 
299
  if isinstance(to_date, str):
300
- to_date = datetime.datetime.strptime(to_date, "%Y-%m-%d").date()
301
-
302
- # 如果没有指定日期范围,使用缓存机制
303
- if from_date is None and to_date is None:
304
- # 如果缓存需要刷新或者缓存为空,重新加载所有日志
305
- if self.cache_needs_refresh or self.cached_df is None:
306
- print("[refresh] Cache miss, reloading all logs")
307
- self.cached_df = self._load_all_logs()
308
- self.cache_needs_refresh = False
309
- else:
310
- print("[refresh] Using cached data")
311
-
312
- # 返回缓存的DataFrame
313
- if self.cached_df is None or self.cached_df.empty:
314
- return []
315
-
316
- return self.cached_df.to_dict(orient="records")
317
- else:
318
- # 如果指定了日期范围,直接加载不使用缓存
319
- print("[refresh] Date range specified, loading without cache")
320
- df = self._load_all_logs(from_date=from_date, to_date=to_date)
321
- if df is None or df.empty:
322
- return []
323
- return df.to_dict(orient="records")
 
5
  import os
6
  import datasets as ds
7
  from apscheduler.schedulers.background import BackgroundScheduler
 
8
  from utils import beijing, md5, json_to_str
9
  from huggingface_hub import HfApi
10
  import pandas as pd
11
+ import datetime
12
+ from zoneinfo import ZoneInfo
13
  import glob
14
 
15
  hf = HfApi()
 
23
  repo_id: str,
24
  local_dir: str = "data/logs",
25
  synchronize_interval: int = 60,
26
+ cache_days: int = 30,
27
  ):
28
  """
29
  :param repo_id: the repo_id of the dataset in huggingface
 
31
  :param synchronize_interval: the interval of synchronizing between local and huggingface
32
 
33
  """
34
+ self.cache_days = cache_days
35
  self.local_dir = local_dir
36
  self.repo_id = repo_id
37
  self.synchronize_interval = synchronize_interval
 
42
  self.today = beijing().date()
43
  ds.disable_progress_bar()
44
  self.dataframe: pd.DataFrame
45
+ self.dataframe_refresh_needed = True
46
+ # 首先下载所有数据
 
 
47
  self.pull()
48
+ # 加载最近30天的日志数据到内存
49
+ self.load_logs()
50
  self.start_synchronize()
51
 
52
  def addlog(self, log: dict):
 
60
  self.buffer[remotepath] = ds.Dataset.from_dict({})
61
  self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
62
  self.need_push[remotepath] = True
63
+ self.dataframe_refresh_needed = True
64
  print("[addlog] Added a log to buffer")
65
 
66
  def remotedir(self):
 
70
  day = now.day.__str__()
71
  return "/".join([year, month, day])
72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  def push_yesterday(self) -> bool:
74
  try:
75
  year = self.today.year.__str__()
 
80
  for filename in self.buffer.keys():
81
  if not filename.startswith(remotedir):
82
  continue
 
 
 
83
  files_to_push.append(filename)
84
  if len(files_to_push) == 0:
85
  return True
 
139
  self.need_push[filename] = False
140
  print(f"[push] Log files pushed to {res}")
141
  print("[push] Done")
 
 
142
  return True
143
  except Exception as e:
144
  print(f"[push] {type(e)}: {e}")
145
  return False
146
 
147
+ def pull(self):
148
+ print("[pull] Starting downloading")
149
  try:
150
  res = hf.snapshot_download(
151
  repo_id=self.repo_id,
152
  repo_type="dataset",
153
  local_dir=self.local_dir,
154
  )
155
+ print(f"[pull] Downloaded to {res}")
156
+ except Exception as e:
157
+ print(f"[pull] {type(e)}: {e}")
158
+ print("[pull] Done")
159
+
160
+ def get_pathes_between(
161
+ self, from_date: datetime.date, to_date: datetime.date
162
+ ) -> list[str]:
163
+ """
164
+ 获取指定日期范围内的路径列表
165
+
166
+ :param from_date: 开始日期(格式:YYYY-MM-DD 或 datetime.date),含该日期
167
+ :param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期
168
+ :return: 日期范围内的路径列表,格式为 ["YYYY/MM/DD", ...]
169
+ """
170
+ pathes = []
171
+ current_date = from_date
172
+ while current_date <= to_date:
173
+ pathes.append(current_date.strftime("%Y/%m/%d"))
174
+ current_date += datetime.timedelta(days=1)
175
+ return pathes
176
+
177
+ def load_logs(self):
178
+ """
179
+ 在启动时加载最近30天的日志数据到内存buffer
180
+ """
181
+ print("[load_logs] Starting to load recent 30 days logs")
182
+
183
+ try:
184
+ today = beijing().date()
185
+ start_date = today - datetime.timedelta(days=self.cache_days)
186
+ print(f"Loading logs from {start_date} to {today}")
187
+ # 生成最近30天的日期范围
188
+ pathes = self.get_pathes_between(start_date, today)
189
+ total_files_loaded = 0
190
+
191
+ # 遍历每一天的日志
192
+ for path in pathes:
193
+ date_path = os.sep.join([self.local_dir, path])
194
+ print(f"[load_logs] Processing directory: {date_path}")
195
+ # 检查该日期的目录是否存在
196
+ if not os.path.exists(date_path):
197
+ print(f"[load_logs] Directory not found: {date_path}")
198
+ continue
199
+ # 加载该目录下的所有JSON文件
200
+ json_files = glob.glob(os.path.join(date_path, "*.json"))
201
+
202
+ for json_file in json_files:
203
+ # 构造相对路径作为buffer的key
204
+ relative_path = os.path.relpath(json_file, self.local_dir).replace(
205
+ os.sep, "/"
206
+ )
207
+ try:
208
+ # 检查文件是否为空
209
+ if os.path.getsize(json_file) == 0:
210
+ print(f"[load_logs] Skipping empty file: {relative_path}")
211
+ continue
212
+
213
+ # 加载JSON数据到Dataset
214
+ dataset = ds.Dataset.from_json(json_file)
215
+ if isinstance(dataset, ds.Dataset):
216
+ self.buffer[relative_path] = dataset
217
+ self.need_push[relative_path] = False
218
+ total_files_loaded += 1
219
+ except Exception as e:
220
+ print(f"[load_logs] Error loading {relative_path}: {e}")
221
+ continue
222
+ print(f"[load_logs] Successfully loaded {total_files_loaded} log files")
223
+ print(f"[load_logs] Total datasets in buffer: {len(self.buffer)}")
224
  except Exception as e:
225
+ print(f"[load_logs] Error: {type(e)}: {e}")
226
+
227
+ def cleanup_old_logs(self):
228
+ """清理buffer中超过30天的日志数据"""
229
+ try:
230
+ print("[cleanup_old_logs] Starting cleanup of old logs")
231
+ cache_dir_to_remove = (
232
+ self.today - datetime.timedelta(days=(self.cache_days + 1))
233
+ ).strftime("%Y/%m/%d")
234
+ print(
235
+ f"[cleanup_old_logs] Removing logs in {cache_dir_to_remove} from buffer"
236
+ )
237
+ removed_count = 0
238
+ for filepath in list(self.buffer.keys()):
239
+ if filepath.startswith(cache_dir_to_remove):
240
+ del self.buffer[filepath]
241
+ del self.need_push[filepath]
242
+ removed_count += 1
243
+
244
+ print(f"[cleanup_old_logs] Cleaned up {removed_count} old log files")
245
+ print(
246
+ f"[cleanup_old_logs] Remaining datasets in buffer: {len(self.buffer)}"
247
+ )
248
+ print("[cleanup_old_logs] Done")
249
+
250
+ except Exception as e:
251
+ print(f"[cleanup_old_logs] Error: {type(e)}: {e}")
252
 
253
  def start_synchronize(self):
254
  self.scheduler.add_job(
 
256
  "interval",
257
  seconds=self.synchronize_interval,
258
  )
259
+ # 添加每日清理任务,在每天凌晨2点执行
260
+ self.scheduler.add_job(
261
+ self.cleanup_old_logs,
262
+ "cron",
263
+ hour=2,
264
+ minute=0,
265
+ )
266
  self.scheduler.start()
267
 
268
+ def refresh_dataframe(self) -> pd.DataFrame:
269
  """
270
+ 加载最近30天的日志文件并返回合并后的DataFrame
 
 
 
 
 
271
  """
272
+ datasets = list(self.buffer.values())
273
+ merged_dataset = ds.concatenate_datasets(datasets)
274
+ self.dataframe = merged_dataset.to_pandas() # type: ignore
275
+ print(f"[refresh_dataframe] Loaded {len(self.dataframe)} logs") # type: ignore
276
+ self.dataframe_refresh_needed = False
277
+ return self.dataframe # type: ignore
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
278
 
279
  def refresh(self, from_date=None, to_date=None) -> list[dict]:
280
  """
281
+ 获取刷新后的日志列表,从内存buffer中合并Dataset,支持日期范围过滤
282
 
283
+ 基于timestamp字段进行日期过滤。时间戳格式为 ISO 8601 格式(如 "2025-09-08T16:01:07.526954+08:00")
 
 
 
 
284
 
285
+ :param from_date: 开始日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志
286
+ :param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志
287
+ :return: 按时间戳降序排列的日志字典列表
288
+ """
289
+ if self.dataframe_refresh_needed:
290
+ self.refresh_dataframe()
291
 
292
+ df = self.dataframe
293
  # 将字符串日期转换为 datetime.date 对象
294
+ tz = ZoneInfo("Asia/Shanghai")
295
  if isinstance(from_date, str):
296
+ from_date = (
297
+ datetime.datetime.strptime(from_date, "%Y-%m-%d")
298
+ .astimezone(tz)
299
+ .isoformat()
300
+ )
301
+ from_date = str(from_date)
302
  if isinstance(to_date, str):
303
+ to_date = datetime.datetime.strptime(to_date, "%Y-%m-%d").astimezone(tz)
304
+ to_date += datetime.timedelta(days=1) # 包含结束日期全天
305
+ to_date = to_date.isoformat()
306
+ print(f"[refresh] Filtering logs from {from_date} to {to_date}")
307
+ # 按timestamp范围过滤(包含边界日期的全天数据)
308
+ if from_date is not None or to_date is not None:
309
+ # 创建日期范围过滤条件
310
+ filter_condition = pd.Series([True] * len(df), index=df.index)
311
+ if from_date is not None:
312
+ filter_condition = filter_condition & (df["timestamp"] >= from_date)
313
+ if to_date is not None:
314
+ filter_condition = filter_condition & (df["timestamp"] < to_date)
315
+ df = df[filter_condition]
316
+ # 按timestamp降序排序(最新日志在前)
317
+ df = df.sort_values(by="timestamp", ascending=False)
318
+ print(f"[refresh] Returning {len(df)} logs")
319
+ return df.to_dict(orient="records")
 
 
 
 
 
 
 
main.py CHANGED
@@ -75,15 +75,15 @@ async def root(
75
  首页端点,支持日期范围查询
76
 
77
  查询参数:
78
- - from_date: 开始日期(格式:YYYY-MM-DD),不指定时默认加载最近7
79
  - to_date: 结束日期(格式:YYYY-MM-DD),不指定时默认为今天
80
  """
81
- # 如果没有指定日期范围,默认加载最近7天的日志
82
  if from_date is None and to_date is None:
83
  today = beijing().date()
84
- from_date = str(today - datetime.timedelta(days=6)) # 最近7天(包括今天)
85
  to_date = str(today)
86
- print(f"[root] No date range specified, using last 7 days: {from_date} to {to_date}")
87
 
88
  data = logger.refresh(from_date=from_date, to_date=to_date)
89
  return templates.TemplateResponse(
 
75
  首页端点,支持日期范围查询
76
 
77
  查询参数:
78
+ - from_date: 开始日期(格式:YYYY-MM-DD),不指定时默认加载最近30
79
  - to_date: 结束日期(格式:YYYY-MM-DD),不指定时默认为今天
80
  """
81
+ # 如果没有指定日期范围,默认加载最近30天的日志
82
  if from_date is None and to_date is None:
83
  today = beijing().date()
84
+ from_date = str(today - datetime.timedelta(days=29)) # 最近30天(包括今天)
85
  to_date = str(today)
86
+ print(f"[root] No date range specified, using last 30 days: {from_date} to {to_date}")
87
 
88
  data = logger.refresh(from_date=from_date, to_date=to_date)
89
  return templates.TemplateResponse(
utils.py CHANGED
@@ -61,3 +61,43 @@ def md5(text: list[str | bytes] | str | bytes | None = None) -> str:
61
 
62
  def json_to_str(obj: dict | list) -> str:
63
  return json.dumps(obj, separators=(",", ":"))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
 
62
  def json_to_str(obj: dict | list) -> str:
63
  return json.dumps(obj, separators=(",", ":"))
64
+
65
+
66
+ def validate_date_format(date_str: str, format_str: str = "%Y-%m-%d") -> bool:
67
+ """
68
+ 验证日期字符串的格式是否正确
69
+
70
+ :param date_str: 要验证的日期字符串
71
+ :param format_str: 期望的日期格式(默认:YYYY-MM-DD)
72
+ :return: 如果格式正确返回 True,否则返回 False
73
+ """
74
+ if not date_str:
75
+ return True # 空值被认为是有效的(可选参数)
76
+
77
+ try:
78
+ from datetime import datetime as dt
79
+ dt.strptime(date_str, format_str)
80
+ return True
81
+ except ValueError:
82
+ return False
83
+
84
+
85
+ def parse_date_range(from_date: str | None, to_date: str | None) -> tuple[str | None, str | None] | tuple[str, str]:
86
+ """
87
+ 解析和验证日期范围
88
+
89
+ :param from_date: 开始日期(格式:YYYY-MM-DD)
90
+ :param to_date: 结束日期(格式:YYYY-MM-DD)
91
+ :return: 验证后的日期范围元组 (from_date, to_date)
92
+ :raises ValueError: 如果日期格式不正确或范围无效
93
+ """
94
+ if from_date and not validate_date_format(from_date):
95
+ raise ValueError(f"Invalid from_date format: {from_date}")
96
+
97
+ if to_date and not validate_date_format(to_date):
98
+ raise ValueError(f"Invalid to_date format: {to_date}")
99
+
100
+ if from_date and to_date and from_date > to_date:
101
+ raise ValueError(f"from_date ({from_date}) cannot be after to_date ({to_date})")
102
+
103
+ return from_date, to_date