2023年第7周, 新的旅途

本周报分享每周所学所知所闻所感,时间范围是 2023-02-132023-02-19 会记录一些工作及生活上有意思的事情。 本周报每周日晚上发布,会同步到本人博客

见闻

为个人开发者实现了一套集成化的收费方案

最近一周在捣腾 RSS 订阅软件的事情,发现市面上可以主动发现独立博客,并可以订阅万物的软件很少,用的很顺滑的工具也非常的少。于是想开发软件来实现,但是服务器维护费用也是非常高的,所以想做一个可以订阅付费,但是作为个人开发者,开通支付渠道门槛非常高。于是在网上摸索了很久,找到了一个基本可行的方案,v2geek「收费5%还是有点高的」。他基本满足我的需求,但是没有提供任何开发的接口,不能和软件体系闭环。于是我采用了爬虫技术,基本实现了平台和软件的闭环,可以达到在软件上支付订阅费用,立马给订阅者分发授权码!一整套操作,不需要任何人工操作。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
#coding=utf-8
import requests
import re
from bs4 import BeautifulSoup
import uuid

class V2GeeKPay(object):
    def __init__(self, cookies, project=""):
        self.cookies = cookies
        self.headers = {
            "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
            "accept-encoding": "gzip, deflate, br",
            "accept-language": "zh-CN,zh;q=0.9,und;q=0.8,zh-Hans;q=0.7,en;q=0.6",
            "cache-control": "max-age=0",
            "cookie": "_sessions={}".format(cookies),
            "referer": "https://v2geek.com/udmin/projects",
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": "Linux",
            "sec-fetch-dest": "document",
            "sec-fetch-mode": "navigate",
            "sec-fetch-site": "same-origin",
            "sec-fetch-user": "?1",
            "upgrade-insecure-requests": "1",
        }
        flag, self.project_list, self.user_name = self.get_project_list()
        if not flag:
            raise ValueError("cookie过期,请检查")
        if project != "":
            self.set_project(project)

    def set_project(self, project):
        project_exist = False
        for project_obj in self.project_list:
            if project_obj["key"] == project:
                self.pay_js = "https://v2geek.com/{}/{}/widget.js".format(self.user_name, project)
                self.login_url = "https://v2geek.com/users/sign_in"
                self.import_url = "https://v2geek.com/udmin/projects/{}/licenses/import".format(project)
                self.project_url = "https://v2geek.com/udmin/projects/{}".format(project)
                self.order_url = "https://v2geek.com/udmin/orders"
                self.project = project
                project_exist = True
                break
        if not project_exist:
            raise ValueError("当前用户不存在此项目,请检查")

    def get_project_list(self):
        """获取当前用户的项目以及用户名"""
        project_list_url = "https://v2geek.com/udmin/projects"
        session = requests.Session()
        try:
            response = str(session.get(project_list_url, headers=self.headers, verify=False).content, encoding="utf-8")
            html_obj = BeautifulSoup(response, "html.parser")
            tr_list = html_obj.select("tbody")[0].select("tr")
            dict_project_info = {
                0: "name",
                1: "price"
            }
            user_name = ""
            project_list = []
            for tr in tr_list:
                temp = {}
                for idx, td in enumerate(tr.select("td")):
                    if idx in dict_project_info:
                        pro_key = dict_project_info[idx]
                        temp[pro_key] = td.text.strip().replace("¥","").replace("元","")
                        if pro_key == "name":
                            key_list = td.select("a")[0].get("href").split("/")
                            if user_name != "":
                                user_name = key_list[1]
                            temp["key"] = key_list[2]
                project_list.append(temp)
            return True, project_list, user_name
        except:
            return False, [], ""

    def generateAuthToken(self, numbers=100):
        """使用uuid算法自动生成authToken
        """
        return [uuid.uuid4().hex for i in range(numbers)]

    def import_auth_token(self, auth_token_list):
        # 自动生成一系列的auth_token值,并分发到系统以便不断的扩充auth_token的值,并将值存入系统
        # 获取token值
        saved_token_list, unused_token, rest_token = self.get_auth_token_list()
        if rest_token != "":
            data = {
                "utf8": (None, "✓"),
                "authenticity_token": (None, rest_token),
                "licenses": (None, "\n".join(auth_token_list)),
                "commit": (None, "提交")
            }
            session = requests.Session()
            session.post(self.import_url, data=data, headers=headers, verify=False)
            print("导入成功")
            return True, "success"
        else:
            print("cookie过期")
            return False, "cookie已过期,请联系工程师处理"

    def get_auth_token_list(self):
        """
        获取账户的相关信息,以便能够实时进行激活处理
        获取成功则返回当前的token,可以用于导入auth_token
        """
        session = requests.Session()
        try:
            response = str(session.get(self.project_url, headers=self.headers, verify=False).content, encoding="utf-8")
            html_obj_first = BeautifulSoup(response, "html.parser")
            page = html_obj_first.select(".pagination")
            meta_list = html_obj_first.select("meta")
            rest_token = ""
            html_obj_list = [html_obj_first]
            for meta in meta_list:
                if meta.get("name") == "csrf-token":
                    rest_token = meta.get("content")
            if len(page):
                page_num = int(page[0].select("a")[-1].get("href").split("=")[1].strip())+1
                for i in range(2, page_num):
                    response = str(session.get(self.project_url+"?page={}".format(i), headers=headers, verify=False).content, encoding="utf-8")
                    html_obj_copy = BeautifulSoup(response, "html.parser")
                    html_obj_list.append(html_obj_copy)
            dict_order_info = {
                0: "auth_token",
                1: "use_status",
                2: "order_id",
            }
            auth_token_list = []
            temp = {}
            for html_obj in html_obj_list:
                for tr in html_obj.select("tbody")[0].select("tr"):
                    if "license" in tr.get("id"):
                        temp = {
                            "license_id": tr.get("id")
                        }
                        for idx, td in enumerate(tr.select("td")):
                            key_id = idx % 4
                            if key_id in dict_order_info:
                                temp[dict_order_info[key_id]] = td.text.strip()
                        auth_token_list.append(temp)
            # 计算还剩余下多少个token没有被分配完,这里不进行自动生成分配,交给别的业务系统来处理这个事情
            unused_token = []
            for auth_token in auth_token_list:
                if auth_token.get("order_id") == "-":
                    unused_token.append(auth_token)
            return auth_token_list, unused_token, rest_token
        except Exception as e:
            return [], [], ""

    def verify_order_token(self, order_id):
        """验证订单号有没有被分配授权码,如果有则返回True和授权码
        """
        auth_token_list, unused_token, rest_token = self.get_auth_token_list()
        for auth_token in auth_token_list:
            if auth_token.get("order_id") == order_id:
                return True, auth_token.get("auth_token")
        return False,""

    def query_order_status(self, order_id):
        """查询当前订单状态"""
        headers = { 'content-type': 'application/json','User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36'}
        order_status_url = "https://v2geek.com/{user_name}/{project}/checkouts/{order_id}/status".format(user_name=self.user_name, project=self.project, order_id=order_id)
        session = requests.Session()
        res = session.get(order_status_url, headers=headers, verify=False)
        html_content = str(res.content, encoding='utf8')
        return html_content.strip()

    def query_order_list(self):
        """查询订单列表
        """
        session = requests.Session()
        res = session.get(self.order_url, headers=self.headers, verify=False)
        html_content = str(res.content, encoding='utf8')
        html_obj_first = BeautifulSoup(html_content, "html.parser")
        try:
            meta_list = html_obj_first.select("meta")
            rest_token = ""
            html_obj_list = [html_obj_first]
            for meta in meta_list:
                if meta.get("name") == "csrf-token":
                    rest_token = meta.get("content")
            html_obj_list = [html_obj_first]
            page = html_obj_first.select(".pagination")
            if len(page):
                page_num = int(page[0].select("a")[-1].get("href").split("=")[1].strip())+1
                for i in range(2, page_num):
                    response = str(session.get(self.order_url+"?page={}".format(i), headers=self.headers, verify=False).content, encoding="utf-8")
                    html_obj_copy = BeautifulSoup(response, "html.parser")
                    html_obj_list.append(html_obj_copy)
            
            dict_order_info = {
                1: "order_status",
                2: "order_id",
                3: "pay_num",
                4: "real_pay",
                5: "order_time",
                6: "pay_time",
                7: "email"
            }
            order_list = []
            temp = {}
            for html_obj in html_obj_list:
                for tr in html_obj.select("tbody")[0].select("tr"):
                    temp = {}
                    for idx, td in enumerate(tr.select("td")):
                        key_id = idx
                        if key_id in dict_order_info:
                            temp[dict_order_info[key_id]] = td.text.strip().replace("¥","").replace("元","")
                    order_list.append(temp)
            return order_list, rest_token
        except Exception as e:
            return [], ""

    def clear_expired_order(self):
        order_list, rest_token = self.query_order_list()
        if rest_token != "":
            for order in order_list:
                if order["order_status"] == "未支付" or order["order_status"] == "已关闭":
                    print(self.close_order(order["order_id"]))
            return True, "success"
        else:
            return False,"cookie过期"

    def close_order(self, order_id):
        order_close_url = "https://v2geek.com/udmin/orders/{}/close".format(order_id)
        order_list, rest_token = self.query_order_list()
        if rest_token != "":
            # 查看当前订单状态
            for order in order_list:
                if order["order_id"] == order_id:
                    if order["order_status"] == "已关闭":
                        self.delete_order(order_id, rest_token)
                        return True
                    elif order["order_status"] == "未支付":
                        session = requests.Session()
                        data = {
                            "_method": (None, "post"),
                            "authenticity_token": (None, rest_token),
                        }
                        res= session.post(order_close_url, data=data, headers=self.headers, verify=False)
                        close_content = str(res.content, encoding='utf8')
                        html_obj = BeautifulSoup(close_content, "html.parser")
                        meta_list = html_obj.select("meta")
                        for meta in meta_list:
                            if meta.get("name") == "csrf-token":
                                rest_token = meta.get("content")
                                if rest_token:
                                    self.delete_order(order_id, rest_token)
                                    return True, "success"
                                else:
                                    return False, "关闭失败"
                            else:
                                return False, "关闭失败"
                    else:
                        return False, "订单为{}状态,不支持关闭".format(order["order_status"])
        else:
            return False,"cookie过期"

    def delete_order(self, order_id, token):
        delete_url = "https://v2geek.com/udmin/orders/{}".format(order_id)
        data = {
            "_method": (None, "delete"),
            "authenticity_token": (None, token),
        }
        session = requests.Session()
        session.post(delete_url, data=data, headers=self.headers, verify=False)

    def moke_pay(self, email, coupon=None):
        """.dockerignore"""
        headers = { 'content-type': 'application/json','User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36'}
        session = requests.Session()
        res = session.get(self.pay_js, headers=headers, verify=False)
        js = str(res.content, encoding='utf8')
        def get_pay_token(js):
            """.dockerignore"""
            it = re.finditer(r"value=([\s\S]*?)/>", js)
            try:
                for match in it:
                    ret = match.group()
                    token = ret.split("\"")[1].replace('\\', "")
                    if len(token) > 16:
                        return token
            except:
                raise ValueError("代码格式错误了,请联系工程师进行修复")
        
        def get_order_id(content):
            """.dockerignore"""
            it = re.finditer(r"/checkouts/([\s\S]*?)/status", content.decode("utf-8"))
            try:
                for match in it:
                    ret = match.group()
                    return ret.split("/")[2]
            except:
                raise ValueError("获取订单,代码格式错误,请联系工程师进行修复")
        
        def get_pay_url(js):
            """.dockerignore"""
            it = re.finditer(r"action=([\s\S]*?)/>", js)
            try:
                for match in it:
                    ret = match.group()
                    reg = 'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
                    url_list = re.findall(reg, ret)
                    for url in url_list:
                        if len(url) > 16:
                            return url.replace('\\', "")
            except Exception as e:
                raise ValueError("代码格式错误,请联系工程师进行修复")

        token = get_pay_token(js)
        pay_url = get_pay_url(js)
        data = {
            "utf8": (None, "✓"),
            "authenticity_token": (None, token),
            "order[buyer_email]": (None, email),
            "order[coupon]": (None, coupon),
            "order[payment]": (None, "wechat"),
            "commit": (None, "提交")
        }
        try:
            r = session.post(pay_url, data=data, verify=False)
            pay_content = r.content
            order_id = get_order_id(pay_content)
            html_obj = BeautifulSoup(pay_content, "html.parser")
            back_url = "https://v2geek.com/{}/{}/checkouts/{}/paying".format(self.user_name, self.project, order_id)
            return order_id, html_obj.select('.qrcode')[0].get('src'), back_url
        except:
            raise ValueError("代码格式错误,请联系工程师进行修复")

开发一款RSS订阅软件

在这信息爆炸的时代,我们身边围绕着十分多的信息,加上推荐系统不断在为我们构建信息围城,我们需要寻找更好的阅读体验的方式,去打破信息围城,以突破信息边界,让我们和世界的信息差不断缩小。有时候也在迷茫,想寻找更好的资讯渠道,比如优质博主,优质信息。知识付费盛行,导致优质信息获取进一步变得十分困难。所以内心一直在想做一款能够对内容进行涮选的rss订阅软件,来丰富自己的阅读,并提升阅读体验和更快的找到优质的信息。