# # app_test/tasks.py # from celery import shared_task # import datetime # import logging # # logger = logging.getLogger(__name__) # # @shared_task # def write_to_file_task(): # current_time = datetime.datetime.now() # message = f"Task is running at {current_time}" # logger.info(message) # print(message) # # # 写入数据到文件 # with open('data_output.txt', 'a') as f: # f.write(f"{message}\n") # # return "Task Completed" # @shared_task # def test_task(): # current_time = datetime.datetime.now() # logger.info(f"Test task started at {current_time}") # print(f"Test task started at {current_time}") # # with open('celery_task_log.txt', 'a') as f: # f.write(f"Test task started at {current_time}\n") # # time.sleep(2) # 模拟2秒的任务执行时间 # # current_time = datetime.datetime.now() # logger.info(f"Test task ended at {current_time}") # print(f"Test task ended at {current_time}") # # with open('celery_task_log.txt', 'a') as f: # f.write(f"Test task ended at {current_time}\n") # # @shared_task # def my_scheduled_task(): # current_time = datetime.datetime.now() # logger.info(f"My scheduled task started at {current_time}") # print(f"My scheduled task started at {current_time}") # # 模拟任务执行时间 # import time # time.sleep(2) # 模拟2秒的任务执行时间 # current_time = datetime.datetime.now() # logger.info(f"My scheduled task ended at {current_time}") # print(f"My scheduled task ended at {current_time}")