refactor: rename project to gallery

This commit is contained in:
2024-08-11 20:46:04 +03:00
parent 3785b8ce5d
commit 72fd378c40
48 changed files with 117 additions and 92 deletions

View File

@@ -0,0 +1,96 @@
import datetime
import logging
from typing import Any, Dict, List
import aiohttp
from aiocache import cached
from bs4 import BeautifulSoup
from gallery.sketch.weather.api import WeatherApi
from gallery.sketch.weather.model import WeatherResponse, WeatherValue
from . import datehelp
from .parser import DAYS_PARSER, LOCATION_PARSER, ONE_DAY_PARSER, ROW_PARSERS
logger = logging.getLogger("gismeteo")
class GismeteoApi(WeatherApi):
BASE_URL = "https://www.gismeteo.ru"
CACHE_TTL = 10 * 60
USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0.0.0 Safari/537.36"
)
COOKIE = (
"cf_clearance=U28mYVC0ENu88vorlL_CWmWOoevvXp0vb4xCqfqYC9s-"
"1722273367-1.0.1.1-"
"IDV73azTHY0V.NAnmEvok3zf5HHEkvF098pmya7IiqRRB5nk3FhbLCb0AeWm_kpTFqi1niFk2mYN_ramGTSl0A"
)
async def _request(self, endpoint: str) -> str:
url = f"{self.BASE_URL}/{endpoint}"
logger.info(url)
async with aiohttp.ClientSession(
headers={
"User-Agent": self.USER_AGENT,
"Cookie": self.COOKIE,
},
raise_for_status=True,
) as session:
async with session.request("GET", url) as response:
return await response.text()
def _parse_oneday(self, date: datetime.date, data: str) -> WeatherResponse:
result: List[Dict[str, Any]] = []
soup = BeautifulSoup(data, features="html.parser")
location = LOCATION_PARSER.parse_location(data)
widget = ONE_DAY_PARSER.parse_widget(soup)
for parser in ROW_PARSERS:
for index, value in enumerate(parser.parse_row(widget)):
while len(result) < index + 1:
result.append({})
result[index][parser.KEY] = value
values = [WeatherValue(**item) for item in result]
return WeatherResponse(
location=location or "n/a",
date=date,
period="day",
values=values,
)
def _parse_manydays(self, data: str) -> WeatherResponse:
result: List[Dict[str, Any]] = []
soup = BeautifulSoup(data, features="html.parser")
location = LOCATION_PARSER.parse_location(data)
widget = DAYS_PARSER.parse_widget(soup)
for parser in ROW_PARSERS:
for index, value in enumerate(parser.parse_row(widget)):
while len(result) < index + 1:
result.append({})
result[index][parser.KEY] = value
values = [WeatherValue(**item) for item in result]
return WeatherResponse(
location=location or "n/a",
date=datetime.date.today(),
period="days",
values=values,
)
async def get_locations(self) -> list[str]:
return [
"orel-4432",
"zmiyevka-184640",
]
@cached(ttl=CACHE_TTL)
async def get_day(self, location_id: str, date: datetime.date) -> WeatherResponse:
data = await self._request(f"weather-{location_id}/{datehelp.dump(date)}")
return self._parse_oneday(date, data)
@cached(ttl=CACHE_TTL)
async def get_days(self, location_id: str, days: int) -> WeatherResponse:
data = await self._request(f"weather-{location_id}/{days}-days")
return self._parse_manydays(data)