1 minute, 6 seconds

发布时间:2024-01-16 12:04:13

下载压缩文件,上传到和iptv同一个机器上,使用 控制面板-计划任务 定时执行。



check_del_channles.rar


# 此脚本功能是链接iptv的数据库,自动测试直播源有效性,并删除无效直播源,可添加在群晖的计划任务中定时执行。

# 此脚本检测结果可能存在部分误报,请谨慎使用。

# 此脚本依赖python和FFmpeg4,请手动在套件中心安装,如使用其他版本FFmpeg,请在下文中修改ffprobe的位置。

# 此脚本需要大量的库,请手动运行一次,根据提示,安装所需的依赖库

# 此脚本需要在安装iptv套件的本地运行,请在同一台机器上执行。


代码如下:

import csv
import sys
import json
import os
import re
import traceback
import requests
import subprocess
import time
import shutil
import pymysql
import pandas as pd
from ffmpy import FFprobe
from subprocess import PIPE
from sys import stdout
from termcolor import colored, RESET
from datetime import datetime
from func_timeout import func_set_timeout, FunctionTimedOut
from requests.adapters import HTTPAdapter

dt = datetime.now()
uniqueList = []

# 此脚本功能是链接iptv的数据库,自动测试直播源有效性,并删除无效直播源,可添加在群晖的计划任务中定时执行。
# 此脚本检测结果可能存在部分误报,请谨慎使用。
# 此脚本依赖python和FFmpeg4,请手动在套件中心安装,如使用其他版本FFmpeg,请在下文中修改ffprobe的位置。
# 此脚本需要大量的库,请手动运行一次,根据提示,安装所需的依赖库
# 此脚本需要在安装iptv套件的本地运行,请在同一台机器上执行。

# 替换以下信息为你的数据库连接参数
host = 'localhost'
user = 'iptv_user'
password = 'xxxxxxxx'
database = 'iptv'

# 提取数据库中的频道信息到csv文件
def outcsv():
    conn = pymysql.connect(host=host, user=user, password=password, database=database)
    cursor = conn.cursor()
    query = "SELECT name, category, id,  url FROM iptv_channels"
    cursor.execute(query)
    result = cursor.fetchall()
    conn.close()
    columns = ['Channel', 'Group', 'Source', 'Link']
    df = pd.DataFrame(result, columns=columns)
    df.to_csv('data.csv', index=False, encoding='utf-8-sig')

# 在此处修改超时设置
@func_set_timeout(5)
def get_stream(num, clist, uri):
    try:
    # 默认ffprobe位置为ffmpeg4的位置,如果安装了其他版本,也可以修改此处的文件地址为其他版本的。
        ffprobe = FFprobe(executable='/var/packages/ffmpeg/target/bin/ffprobe',
            inputs={uri: '-v error -show_format -show_streams -print_format json'})
        cdata = json.loads(ffprobe.run(
            stdout=PIPE, stderr=PIPE)[0].decode('utf-8'))
        return cdata
    except Exception as e:
        print('[{}] {}({}) Error:{}'.format(
            str(num), clist[0], clist[2], str(e)))
        return False

def check_channel(clist, num):
    uri = clist[3]
    requests.adapters.DEFAULT_RETRIES = 3
    try:
        r = requests.get(clist[3], timeout=3)
        if r.status_code == requests.codes.ok:
            cdata = get_stream(num, clist, uri)
            if cdata:
                flagAudio = 0
                flagVideo = 0
                flagHDR = 0
                flagHEVC = 0
                vwidth = 0
                vheight = 0
                for i in cdata['streams']:
                    if i['codec_type'] == 'video':
                        flagVideo = 1
                        if 'color_space' in i:
                            if 'bt2020' in i['color_space']:
                                flagHDR = 1
                        if i['codec_name'] == 'hevc':
                            flagHEVC = 1
                        if vwidth <= i['coded_width']:
                            vwidth = i['coded_width']
                            vheight = i['coded_height']
                    elif i['codec_type'] == 'audio':
                        flagAudio = 1
                if flagAudio == 0:
                    print('[{}] {}({}) Error: Video Only!'.format(
                        str(num), clist[0], clist[2]))
                    return False
                if flagVideo == 0:
                    print('[{}] {}({}) Error: Audio Only!'.format(
                        str(num), clist[0], clist[2]))
                    return False
                if (vwidth == 0) or (vheight == 0):
                    print('[{}] {}({}) Error: {}x{}'.format(
                        str(num), clist[0], clist[2], vwidth, vheight))

                if flagHDR == 0:
                    print('[{}] {}({}) PASS: {}*{}'.format(str(num),
                          clist[0], clist[2], vwidth, vheight))
                    return [vwidth, vheight, '']
                if flagHDR == 1:
                    print('[{}] {}({}) PASS(HDR Enabled): {}*{}'.format(str(num),
                          clist[0], clist[2], vwidth, vheight))
                    return [vwidth, vheight, 'HDR']
                if flagHEVC == 1:
                    print('[{}] {}({}) PASS(HEVC Enabled): {}*{}'.format(str(num),
                          clist[0], clist[2], vwidth, vheight))
                    return [vwidth, vheight, 'HEVC']
            else:
                return False
        else:
            print('[{}] {}({}) Error:{}'.format(
                str(num), clist[0], clist[2], str(r.status_code)))
            return False
    except Exception as e:
        print('[{}] {}({}) Error:{}'.format(
            str(num), clist[0], clist[2], str(e)))
        return False

def print_info():
    print('Time: {}-{}-{} {}:{}'.format(dt.year,
          dt.month, dt.day, dt.hour, dt.minute))

def rm_files(target, selection):
    if selection == 1:
        try:
            shutil.rmtree(target)
        except OSError:
            pass
        try:
            os.mkdir(target)
        except OSError:
            pass
    else:
        try:
            os.remove(target)
        except OSError:
            pass

def getdes(st):
    if st:
        return '[{}]'.format(st)
    else:
        return ''

# 导出测试失败列表
def main():
    print_info()
    Total = 0
    Total_failed = 0
    fulltimes = '-{}{}{}{}{}'.format(dt.year,
                                     dt.month, dt.day, dt.hour, dt.minute)
    times = ''
    rm_files('failed.txt', 2)
    with open('data.csv') as f:
        f_csv = csv.reader(f)
        headers = next(f_csv)
        num = 1
        failed_channels = []

        for row in f_csv:
            try:
                if row[3] in uniqueList:
                    ret = False
                else:
                    uniqueList.append(row[3])
                    ret = check_channel(row, num)
            except FunctionTimedOut as e:
                print('[{}] {}({}) Error:{}'.format(
                    str(num), row[0], row[2], str(e)))
                ret = False

            if ret:
                Total += 1
            else:
                failed_channels.append(row[3])
                Total_failed += 1
            num += 1
            time.sleep(0.25)

        with open('failed.txt', 'w') as failed_file:
            for failed_channel in failed_channels:
                failed_file.write(f'{failed_channel}\n')

    print('有效数量: {}'.format(Total))
    print('无效数量: {}'.format(Total_failed))

# 删除测试失败链接
def delchannel():
    conn = pymysql.connect(host=host, user=user, password=password, database=database)
    cursor = conn.cursor()
    # 从failed.txt文件中读取URLs
    with open('failed.txt', 'r') as file:
        urls = file.read().splitlines()
    # 删除对应的记录
    for url in urls:
        try:
            # 使用参数化查询防止SQL注入
            delete_query = "DELETE FROM iptv_channels WHERE url = %s"
            cursor.execute(delete_query, (url,))
            conn.commit()
            print(f"'{url}' 已从数据库删除.")
        except Exception as e:
            print(f"删除失败: '{url}': {e}")
            conn.rollback()

    # 关闭数据库连接
    cursor.close()
    conn.close()
    
    # 删除data.csv文件
    csv_file_path = 'data.csv'
    try:
        os.remove(csv_file_path)
        print(f"临时文件:{csv_file_path} 已清除。")
    except FileNotFoundError:
        print(f"{csv_file_path} 文件不存在,无需删除。")
    except Exception as e:
        print(f"删除 {csv_file_path} 文件时出错:{e}")

if __name__ == '__main__':
    outcsv();
    main();
    delchannel();