-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmultithreading_io.py
48 lines (36 loc) · 1.43 KB
/
multithreading_io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import os
import threading
import time
def read_file_simulated(file_name):
print(f"Reading {file_name}")
print(f"Task assigned to thread: {threading.current_thread().name}")
print(f"ID of process running read task: {os.getpid()}")
time.sleep(2) # Simulate a time-consuming read operation
print(f"Finished reading {file_name}")
def read_file(file_name):
file_path = os.path.join(os.path.dirname(__file__), '..', 'resources', file_name)
relative_path = os.path.relpath(file_path, start=os.getcwd()) # The relative path from current directory
with open(file_path, 'r') as file:
first_line = file.readline().strip()
# Get the current thread object and its name
thread_name = threading.current_thread().name
print(f"Read from {relative_path}: {first_line} - Thread: {thread_name} - ProcessID: {os.getpid()}")
def thread_function():
# Files to read
files = ['file1.txt', 'file2.txt', 'file3.txt']
# Creating and starting threads for each file
threads = []
for file in files:
thread = threading.Thread(target=read_file, args=(file,))
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
def main():
"""Main thread function starting."""
print("Running main thread I/O function...")
thread_function()
print(f"Threads executed.")
if __name__ == "__main__":
main()